zoukankan      html  css  js  c++  java
  • MQ确认机制之事务机制,tx

    一:介绍

    1.介绍

      在前面的说的模式中会出现一个问题。

      就是生产者将消息发送出去到底有没有到达rabbitMq,默认情况下是不知道。

      有两种解决方式。

        AMQP实现事务机制

        Confirm机制。

      这里先说明第一种实现方式。

    2.事务机制

      txSelect:用于将当前的channel设置成transation模式。

      txCommit:用于提交事务

      txRollback:回滚事务

    3.缺点

      很耗时,降低吞吐量。

    二:程序

    1.生产者

     1 package com.mq.TxCommit;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 import java.util.Map;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class Send {
    11     private static final String QUEUE_NAME="test_queue_tx";
    12     public static void main(String[] args)throws Exception{
    13         Connection connection= ConnectionUtil.getConnection();
    14         Channel channel=connection.createChannel();
    15         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    16         String msg="tx msg";
    17         try {
    18             channel.txSelect();
    19             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    20             System.out.println("msg:"+msg);
    21             //营造一个可以回退的语句
    22             int a=1/0;
    23             channel.txCommit();
    24         }catch (Exception e){
    25             channel.txRollback();
    26         }
    27     }
    28 }

    2.消费者

     1 package com.mq.TxCommit;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Receive {
     9     private static final String QUEUE_NAME="test_queue_tx";
    10     public static void main(String[] args)throws Exception {
    11         Connection connection = ConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    14         channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    17                 System.out.println(new String(body,"utf-8"));
    18             }
    19         });
    20     }
    21 }

    3.现象

      就不粘图了,用文字说明一番。

      如果没有粘贴可以造成异常的语句,就可以收到消息;如果有,就收不到消息。

        

  • 相关阅读:
    Azure HPC Pack Cluster添加辅助节点
    Azure HPC Pack 辅助节点模板配置
    Azure HPC Pack配置管理系列(PART6)
    Windows HPC Pack 2012 R2配置
    Azure HPC Pack 节点提升成域控制器
    Azure HPC Pack VM 节点创建和配置
    Azure HPC Pack 部署必要条件准备
    Azure HPC Pack 基础拓扑概述
    Azure VM 性能计数器配置
    Maven私仓配置
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8627301.html
Copyright © 2011-2022 走看看