zoukankan      html  css  js  c++  java
  • MQ确认机制之事务机制,tx

    一:介绍

    1.介绍

      在前面的说的模式中会出现一个问题。

      就是生产者将消息发送出去到底有没有到达rabbitMq,默认情况下是不知道。

      有两种解决方式。

        AMQP实现事务机制

        Confirm机制。

      这里先说明第一种实现方式。

    2.事务机制

      txSelect:用于将当前的channel设置成transation模式。

      txCommit:用于提交事务

      txRollback:回滚事务

    3.缺点

      很耗时,降低吞吐量。

    二:程序

    1.生产者

     1 package com.mq.TxCommit;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 import java.util.Map;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 public class Send {
    11     private static final String QUEUE_NAME="test_queue_tx";
    12     public static void main(String[] args)throws Exception{
    13         Connection connection= ConnectionUtil.getConnection();
    14         Channel channel=connection.createChannel();
    15         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    16         String msg="tx msg";
    17         try {
    18             channel.txSelect();
    19             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    20             System.out.println("msg:"+msg);
    21             //营造一个可以回退的语句
    22             int a=1/0;
    23             channel.txCommit();
    24         }catch (Exception e){
    25             channel.txRollback();
    26         }
    27     }
    28 }

    2.消费者

     1 package com.mq.TxCommit;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Receive {
     9     private static final String QUEUE_NAME="test_queue_tx";
    10     public static void main(String[] args)throws Exception {
    11         Connection connection = ConnectionUtil.getConnection();
    12         Channel channel = connection.createChannel();
    13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    14         channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
    15             @Override
    16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    17                 System.out.println(new String(body,"utf-8"));
    18             }
    19         });
    20     }
    21 }

    3.现象

      就不粘图了,用文字说明一番。

      如果没有粘贴可以造成异常的语句,就可以收到消息;如果有,就收不到消息。

        

  • 相关阅读:
    idea_pyspark 环境配置
    Win7 单机Spark和PySpark安装
    Spark在Windows下的环境搭建
    linux 登陆key生成
    nginx 根据参数选择文档根目录
    系统操作日志设计(转)
    smarty、smarty格式化、smarty整数、smarty float、smarty各种转换方式、smarty日期转换等等 (转)
    Mac下面的SecureCRT(附破解方案) 更新到最新的7.3.2(转)
    nginx php-fpm 输出php错误日志
    解决PHP显示Warning和Notice等问题
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8627301.html
Copyright © 2011-2022 走看看