zoukankan      html  css  js  c++  java
  • sharding-jdbc源码解析

    参考博客:https://cloud.tencent.com/developer/article/1529692

     

     看sharding-jdbc支持XA协议重点看下面的代码

     sharding-transaction-xa-atomikos模块中主要是原生的atomikos的配置,atomikos事务配置都是在transactions.properties中进行配置的

    sharding-transaction模块由sharding-transaction-core,sharding-transaction-2pc和sharding-transaction-base这3个子模块组成。

    Apache ShardingSphere(Incubating)能够自动将XADataSource作为数据库驱动的数据源接入XA事务管理器。而针对于使用DataSource作为数据库驱动的应用,用户也无需改变其编码以及配置,Apache ShardingSphere(Incubating)通过自动适配的方式,在中间件内部将其转化为支持XA协议的XADataSource和XAConnection,并将其作为XA资源注册到底层的XA事务管理器中。

    ShardingSphere还会实现XA协议的recovery部分,即在事务处理器出现崩溃的情况时,可以有能力提供in-doubt transactions来实现事务恢复。

    不支持informix数据库atomikosTransactionManager

     

     sharding-jdbc支持事务上面的两个注解是需要的

     @Transactional表示底层使用spring的事务管理,spring底层的事务管理器使用默认的com.atomikos.icatch.jta.UserTransactionManager

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
     
     
        <!--==========针对两个库,各配置一个AtomikosDataSourceBean,底层都使用MysqlXADataSource=====================-->
        <!--配置数据源db_user-->
        <bean id="db_user" class="com.atomikos.jdbc.AtomikosDataSourceBean"
              init-method="init" destroy-method="close">
            <property name="uniqueResourceName" value="ds1" />
            <property name="xaDataSourceClassName"
                      value="com.mysql.cj.jdbc.MysqlXADataSource" />
            <property name="xaProperties">
                <props>
                    <prop key="url">jdbc:mysql://localhost:3306/db_user?serverTimezone=UTC</prop>
                    <prop key="user">root</prop>
                    <prop key="password">123456</prop>
                </props>
            </property>
        </bean>
     
        <!--配置数据源db_account-->
        <bean id="db_account" class="com.atomikos.jdbc.AtomikosDataSourceBean"
              init-method="init" destroy-method="close">
            <property name="uniqueResourceName" value="ds2" />
            <property name="xaDataSourceClassName"
                      value="com.mysql.cj.jdbc.MysqlXADataSource" />
            <property name="xaProperties">
                <props>
                    <prop key="url">jdbc:mysql://localhost:3306/db_account?serverTimezone=UTC</prop>
                    <prop key="user">root</prop>
                    <prop key="password">123456</prop>
                </props>
            </property>
        </bean>
     
        <!--=============针对两个数据源,各配置一个SqlSessionFactoryBean============ -->
        <bean id="ssf_user" class="org.mybatis.spring.SqlSessionFactoryBean">
            <property name="dataSource" ref="db_user" />
        </bean>
     
        <bean id="ssf_account" class="org.mybatis.spring.SqlSessionFactoryBean">
            <property name="dataSource" ref="db_account" />
        </bean>
     
        <!--=============针对两个SqlSessionFactoryBean,各配置一个MapperScannerConfigurer============ -->
        <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
            <property name="sqlSessionFactoryBeanName" value="ssf_user"/>
            <!--指定com.tianshouzhi.atomikos.mappers.db_user包下的UserMapper接口使用ssf_user获取底层数据库连接-->
            <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.db_user"/>
        </bean>
     
        <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
            <property name="sqlSessionFactoryBeanName" value="ssf_account"/>
            <!--指定com.tianshouzhi.atomikos.mappers.ds_account包下的AccountMapper接口使用ssf_account获取底层数据库连接-->
            <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.ds_account"/>
        </bean>
     
        <!--================配置atomikos事务管理器========================-->
        <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
              destroy-method="close">
            <property name="forceShutdown" value="false"/>
        </bean>
     
        <!--============配置spring的JtaTransactionManager,底层委派给atomikos进行处理===============-->
        <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
            <property name="transactionManager" ref="atomikosTransactionManager"/>
        </bean>
     
        <!--配置spring声明式事务管理器-->
        <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
     
        <bean id="jtaService" class="com.tianshouzhi.atomikos.JTAService"/>
    </beans>

    第二个注解 @ShardingTransactionType(TransactionType.XA)也是需要的,改注解sharding-jdbc将一般的数据源拦截成XADatasource,将一般的事务管理器封装成XA的事务管理器atomikosTransactionManager

    这里的原理在于

    https://blog.csdn.net/liu1390910/article/details/94554356

     

    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    
    com.atomikos.icatch.serial_jta_transactions = false
    com.atomikos.icatch.default_jta_timeout = 300000
    com.atomikos.icatch.max_actives = 10000
    com.atomikos.icatch.checkpoint_interval = 50000
    com.atomikos.icatch.enable_logging = true
    com.atomikos.icatch.log_base_name = xa_tx
    com.atomikos.icatch.log_base_dir = ./logs

    atomikos 创建数据源,报Max number of active transactions

     
    技术小美 2017-11-15 13:25:00 浏览1828
     
    在使用atomikos 事务管理中,当并发数超过50的时候会产生异常如下:
    java.lang.IllegalStateException:Max number of active transactions reched:50

    原因:

    atomikos的默认配置中 transactions.properties中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    # SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
    # THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
    # UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
     
    # Required: factory implementation class of the transaction core.
    # NOTE: there is no default for this, so it MUST be specified! 
    com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
     
             
    # Set base name of file where messages are output 
    # (also known as the 'console file').
    #
    # com.atomikos.icatch.console_file_name = tm.out
     
    # Size limit (in bytes) for the console file;
    # negative means unlimited.
    #
    # com.atomikos.icatch.console_file_limit=-1
     
    # For size-limited console files, this option
    # specifies a number of rotating files to 
    # maintain.
    #
    # com.atomikos.icatch.console_file_count=1
     
    # Set the number of log writes between checkpoints
    #
    # com.atomikos.icatch.checkpoint_interval=500
     
    # Set output directory where console file and other files are to be put
    # make sure this directory exists!
    #
    # com.atomikos.icatch.output_dir = ./
     
    # Set directory of log files; make sure this directory exists!
    #
    # com.atomikos.icatch.log_base_dir = ./
     
    # Set base name of log file
    this name will be  used as the first part of 
    # the system-generated log file name
    #
    # com.atomikos.icatch.log_base_name = tmlog
     
    # Set the max number of active local transactions 
    # or -1 for unlimited.
    #
    # com.atomikos.icatch.max_actives = 50 (原因)
     
    # Set the default timeout (in milliseconds) for local transactions
    #
    # com.atomikos.icatch.default_jta_timeout = 10000
     
    # Set the max timeout (in milliseconds) for local transactions
    #
    # com.atomikos.icatch.max_timeout = 300000
     
    # The globally unique name of this transaction manager process
    # override this value with a globally unique name
    #
    # com.atomikos.icatch.tm_unique_name = tm
         
    # Do we want to use parallel subtransactions? JTA's default
    # is NO for J2EE compatibility
    #
    # com.atomikos.icatch.serial_jta_transactions=true
                         
    # If you want to do explicit resource registration then
    # you need to set this value to false.
    #
    # com.atomikos.icatch.automatic_resource_registration=true  
         
    # Set this to WARN, INFO or DEBUG to control the granularity
    # of output to the console file.
    #
    # com.atomikos.icatch.console_log_level=WARN
         
    # Do you want transaction logging to be enabled or not?
    # If set to false, then no logging overhead will be done
    # at the risk of losing data after restart or crash.
    #
    # com.atomikos.icatch.enable_logging=true
     
    # Should two-phase commit be done in (multi-)threaded mode or not?
    # Set this to false if you want commits to be ordered according
    # to the order in which resources are added to the transaction.
    #
    # NOTE: threads are reused on JDK 1.5 or higher. 
    # For JDK 1.4, thread reuse is enabled as soon as the 
    # concurrent backport is in the classpath - see 
    # http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
    #
    # com.atomikos.icatch.threaded_2pc=false
     
    # Should shutdown of the VM trigger shutdown of the transaction core too?
    #
    # com.atomikos.icatch.force_shutdown_on_vm_exit=false

    修改默认配置中的:

    1
    com.atomikos.icatch.max_actives = 50------改为更大就可以解决
     
    http://www.voidcn.com/article/p-odlbtmrm-boe.html

     使用atomikos时,事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常com.atomikos.icatch.RollbackException: Prepare: NO vote。

        今天总算通过bing找到了答案,记录一下。

        在网上很多说的要设置com.atomikos.icatch.max_timeout和com.atomikos.icatch.default_jta_timeout,居然都没说怎么设置,集成Spring的情况下,第一时间想到在bean的属性里配置,结果没找到(default_jta_timeout可以在org.springframework.transaction.jta.JtaTransactionManager中找到defaultTimeOut属性与之匹配,但是max_timeout没有找到,官方文档(https://www.atomikos.com/Documentation/JtaProperties)也没说,只是说设置max_timeout和UserTransaction.setTransactionTimeout()是一个意思,结果我配置了半天,没有效果)。最后搞明白了,需要在classpath下建一个jta.properties(或者transactions.properties)文件(事务管理器的配置),来配置事务相关属性,如下是我的jta.properties。

        PS:default_jta_timeout表示开启事务时,默认的超时时间,max_timeout表示最大的超时时间,0表示无限时间。如果default_jta_timeout设置的值超过了max_timeout,会自动将超时时间截断,使用max_timeout的值(日志会打印出来)。

     然后,Spring的配置文件

    <!-- Atomikos 事务管理器配置 -->
        <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
            init-method="init" destroy-method="close">
            <!-- <property name="startupTransactionService" value="false" /> -->
            <!-- close()时是否强制终止事务 -->
            <property name="forceShutdown" value="false" />
        </bean>
    
        <!-- Atomikos UserTransaction配置 -->
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"></bean>
    
        <!-- JTA事务管理器 -->
        <bean id="txManager" class="org.springframework.transaction.jta.JtaTransactionManager">
            <property name="transactionManager">
                <ref bean="atomikosTransactionManager" />
            </property>
            <property name="userTransaction">
                <ref bean="atomikosUserTransaction" />
            </property>
        </bean>
        <tx:annotation-driven transaction-manager="txManager" proxy-target-class="true" />

     为XA和非XA提供内置的JDBC适配器,所有不再需要配置多余的东西

     

     sharding-transaction-core主要的功能在于ShardingTransactionManager类以spi扩展的是是哪些事务类型来生成对于的事务管理器,例如当前是XA的atomitx事务管理器,那么这里ShardingTransactionManager对于的事务管理器就是com.atomikos.icatch.jta.UserTransactionManager

     

     sharding-transaction-spring模块就是扫描断言的,当你的方法配置了@ShardingTransactionType (TransactionType.XA)注解之后,harding-transaction-spring模块会拦截到改注解,通过Threadlocal切换当前事务类型,适用于Sharding-JDBC。例如:TransactionTypeHolder.set (TransactionType.XA),代码如下

     Apache ShardingSphere(Incubating)官方目前实现了基于Atomikos和Bitronix的SPI,并且邀请了Radhat JBoss的XA事务引擎Narayana [https://github.com/jbosstm/narayana] 开发团队实现了JBoss的SPI。用户可以自行的在Atomikos,Bitronix和Narayana间选择自己喜欢的XA事务管理器。

     sharding-transaction-xa-sp模块就是sharding-jdbc提供的模块,将外部的XAresource资源封装到事务管理器中XATransactionManager中,这里XA

  • 相关阅读:
    windows系统切换jdk,修改java_home无效情况
    Cannot instantiate interface org.springframework.context.ApplicationListener
    MySQL分组查询获取每个学生前n条分数记录(分组查询前n条记录)
    ASP.NET Web API 使用Swagger生成在线帮助测试文档,支持多个GET
    EF TO MYSQL 无法查询中文的解决方法
    HttpWebRequest post请求获取webservice void数据信息
    This implementation is not part of the Windows Platform FIPS validated cryptographic algorithms. 此实现不是 Windows 平台 FIPS 验证的加密算法的一部分 解决方案
    MySQL 5.7.13解压版安装记录 mysql无法启动教程
    C# udpclient 发送数据断网后自动连接的方法
    汽车XX网站秒杀抢购代码
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/12887116.html
Copyright © 2011-2022 走看看