zoukankan      html  css  js  c++  java
  • php + mysql 分布式事务(转)

    事务(Transaction)是访问并可能更新数据库中各种数据项的一个程序执行单元;

    事务应该具有4个属性:原子性、一致性、隔离性、持续性

    原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
    一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。

    隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
    持久性(durability)。持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。

    分布式事务:分布式事务的参与者、资源管理器、事务管理器等位于不用的节点上,这些不同的节点相互协作共同完成一个具有逻辑完整性的事务。

    纠正自己对mysql的一个误解,mysql从5.0开始支持XA DataSource。Connector/J 版本要使用5.0版本,5.0以下的不支持。

      XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口XA协议包括两套函数,以xa_开头的及以ax_开头的。 

      以下的函数使事务管理器可以对资源管理器进行的操作: 
      1)xa_open,xa_close:建立和关闭与资源管理器的连接。 
      2)xa_start,xa_end:开始和结束一个本地事务。 
      3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。 
      4)xa_recover:回滚一个已进行预提交的事务。 
      5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。 
      6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。

     

    MySQL XA分为两类,内部XA与外部XA;内部XA用于同一实例下跨多个引擎的事务,由大家熟悉的Binlog作为协调者;外部XA用于跨多MySQL实例的分 布式事务,需要应用层介入作为协调者(崩溃时的悬挂事务,全局提交还是回滚,需要由应用层决定,对应用层的实现要求较高);

     Binlog作为内部XA的协调者,在binlog中出现的内部xid,在crash recover时,由binlog负责提交。(这是因为,binlog不进行prepare, 只进行commit,因此在binlog中出现的内部xid,一定能够保证其在底层各存储引擎中已经完成prepare)。

    MySQL数据库外部XA可以用在分布式数据库代理层,实现对MySQL数据库的分布式事务支持,例如开源的代理工具:网易的DDB,淘宝的TDDL,B2B的Cobar等等。

     

     

    示例

    public function testAction(){
            $goods_id=1;
            $goods_name = "大西瓜";
            $num = 1;
            $rs_order = $this->test->createorder($goods_id,$goods_name,$num);
            $rs_goods = $this->test->deduction($goods_id,$num);
            if($rs_order['status'] =="success" && $rs_goods['status']=="success"){
                $this->test->commitdb($rs_order['XA']);
                $this->test->commitdb1($rs_goods['XA']);
            }else{
                $this->test->rollbackdb($rs_order['XA']);
                $this->test->rollbackdb1($rs_goods['XA']);
            }
            
            print_r($rs_order);
            echo "<br />";
            print_r($rs_goods);
            die("dddd");
        }
        
        public function createorder($goods_id,$goods_name,$num){
            $XA = uniqid("");
            $this->_db->query("XA START '$XA'");
            $_rs = true;
            try {
                $data = array();
                $data['order_id'] = "V".date("YmdHis");
                $data['goods_name'] = $goods_name;
                $data['goods_num'] = $num;
                $this->_db->insert("temp_orders",$data);
                $rs =  $this->_db->lastInsertId();
                if($rs){
                    $_rs = true;
                }else{
                    $_rs = false;
                }
            } catch (Exception $e) {
                $_rs = false;
            }
            $this->_db->query("XA END '$XA'");
             if($_rs){
                     $this->_db->query("XA PREPARE '$XA'");
                     return array("status"=>"success","XA"=>$XA);
             }else{
                     return array("status"=>"nosuccess","XA"=>$XA);
             }
        }
        
        public function deduction($id){
            $XA = uniqid("");
            $this->db1->query("XA START '$XA'");
            $last_rs = true;
            try {
                    $sql = "select * from temp_goods where id = '$id' and goods_num>0";
                    $rs = $this->db1->fetchRow($sql);
                    if(!empty($rs)){
                        $sql = "update temp_goods set goods_num = goods_num-1 where id = '$id'";
                        $rd = $this->db1->query($sql);
                        if($rd){
                            $last_rs = true;
                        }else{
                            $last_rs = false;
                        }
                    }else{
                            $last_rs = false;;
                    }
            } catch (Exception $e) {
                 $last_rs = false;;
            }
             $this->db1->query("XA END '$XA'");
             
             if($last_rs){
                     $this->db1->query("XA PREPARE '$XA'");
                     return array("status"=>"success","XA"=>$XA);
             }else{
                     return array("status"=>"nosuccess","XA"=>$XA);
             }
        
        }
        //提交事务!
        public function commitdb($xa){
            return $this->_db->query("XA COMMIT '$xa'");
        }
        
        //回滚事务
        public function rollbackdb($xa){
            return $this->_db->query("XA ROLLBACK '$xa'");
        }
        
        //提交事务!
        public function commitdb1($xa){
            return $this->db1->query("XA COMMIT '$xa'");
        }
        
        //回滚事务
        public function rollbackdb1($xa){
            return $this->db1->query("XA ROLLBACK '$xa'");
        }

  • 相关阅读:
    java实现倒计时
    javaweb启动时启动socket服务端
    二进制数与十六进制数之间如何互相转换
    分组查询最新时间的数据
    javaweb利用钉钉机器人向钉钉群推送消息(解决中文乱码)
    java利用钉钉机器人向钉钉群推送消息
    Java原生操作数据库(不使用框架)
    Mybatis 插入与批量插入以及多参数批量删除
    在spring中直接在类中注入dao
    web自动化测试实战之批量执行测试用例
  • 原文地址:https://www.cnblogs.com/sandea/p/5119146.html
Copyright © 2011-2022 走看看