Transaction
基本管理接口看着很熟悉,没啥特别的
具体实现有两类,基于XA的两阶段事务模型和基于SEATA-AT的柔性事务模型
XA和SEATA-AT经典流程图
官网图比较清晰
XA事务管理模型XAShardingTransactionManager
负责对多数据源进行管理和适配,并且将相应事务的开启、提交和回滚操作委托给具体的 XA 事务管理器
public final class XAShardingTransactionManager implements ShardingTransactionManager { private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>(); private XATransactionManager xaTransactionManager; @Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) { xaTransactionManager = XATransactionManagerLoader.getInstance().getXATransactionManager(transactionMangerType); xaTransactionManager.init(); resourceDataSources.forEach(each -> cachedDataSources.put(each.getOriginalName(), newXATransactionDataSource(databaseType, each))); } private XATransactionDataSource newXATransactionDataSource(final DatabaseType databaseType, final ResourceDataSource resourceDataSource) { String resourceName = resourceDataSource.getUniqueResourceName(); DataSource dataSource = resourceDataSource.getDataSource(); return new XATransactionDataSource(databaseType, resourceName, dataSource, xaTransactionManager); } @Override public TransactionType getTransactionType() { return TransactionType.XA; } @SneakyThrows(SystemException.class) @Override public boolean isInTransaction() { return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus(); } @Override public Connection getConnection(final String dataSourceName) throws SQLException { try { return cachedDataSources.get(dataSourceName).getConnection(); } catch (final SystemException | RollbackException ex) { throw new SQLException(ex); } } @SneakyThrows({SystemException.class, NotSupportedException.class}) @Override public void begin() { xaTransactionManager.getTransactionManager().begin(); } @SneakyThrows({SystemException.class, RollbackException.class, HeuristicMixedException.class, HeuristicRollbackException.class}) @Override public void commit() { xaTransactionManager.getTransactionManager().commit(); } @SneakyThrows(SystemException.class) @Override public void rollback() { xaTransactionManager.getTransactionManager().rollback(); } @Override public void close() throws Exception { for (XATransactionDataSource each : cachedDataSources.values()) { each.close(); } cachedDataSources.clear(); xaTransactionManager.close(); } }
管理的数据源也是需要支持XA协议的,看一段XATransactionDataSource中的片段
public Connection getConnection() throws SQLException, SystemException, RollbackException { if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) { return dataSource.getConnection(); } Connection result = dataSource.getConnection(); XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result); Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction(); if (!enlistedTransactions.get().contains(transaction)) { transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource())); transaction.registerSynchronization(new Synchronization() { @Override public void beforeCompletion() { enlistedTransactions.get().remove(transaction); } @Override public void afterCompletion(final int status) { enlistedTransactions.get().clear(); } }); enlistedTransactions.get().add(transaction); } return result; }
XA连接需要支持XA的底层数据库,以MYSQL为例看下数据库的支持
@RequiredArgsConstructor public final class MySQLXAConnectionWrapper implements XAConnectionWrapper { private static final String MYSQL_XA_DATASOURCE_5 = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"; private static final String MYSQL_XA_DATASOURCE_8 = "com.mysql.cj.jdbc.MysqlXADataSource"; @SneakyThrows(ReflectiveOperationException.class) @Override public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) { Connection physicalConnection = unwrapPhysicalConnection(xaDataSource.getClass().getName(), connection); Method method = xaDataSource.getClass().getDeclaredMethod("wrapConnection", Connection.class); method.setAccessible(true); return (XAConnection) method.invoke(xaDataSource, physicalConnection); } @SneakyThrows({SQLException.class, ClassNotFoundException.class}) private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) { switch (xaDataSourceClassName) { case MYSQL_XA_DATASOURCE_5: return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection")); case MYSQL_XA_DATASOURCE_8: return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection")); default: throw new UnsupportedOperationException(String.format("Cannot support xa datasource: `%s`", xaDataSourceClassName)); } } }
参考官网:
XAShardingTransactionManager
将数据库连接所对应的 XAResource 注册到当前 XA 事务中之后,事务管理器会在此阶段发送 XAResource.start
命令至数据库。 数据库在收到 XAResource.end
命令之前的所有 SQL 操作,会被标记为 XA 事务。
XAShardingTransactionManager
在接收到接入端的提交命令后,会委托实际的 XA 事务管理进行提交动作, 事务管理器将收集到的当前线程中所有注册的 XAResource,并发送 XAResource.end
指令,用以标记此 XA 事务边界。 接着会依次发送 prepare
指令,收集所有参与 XAResource 投票。 若所有 XAResource 的反馈结果均为正确,则调用 commit
指令进行最终提交; 若有任意 XAResource 的反馈结果不正确,则调用 rollback
指令进行回滚。 在事务管理器发出提交指令后,任何 XAResource 产生的异常都会通过恢复日志进行重试,以保证提交阶段的操作原子性,和数据强一致性。
public final class SeataATShardingTransactionManager implements ShardingTransactionManager { private final Map<String, DataSource> dataSourceMap = new HashMap<>(); private final String applicationId; private final String transactionServiceGroup; private final boolean enableSeataAT; public SeataATShardingTransactionManager() { FileConfiguration config = new FileConfiguration("seata.conf"); enableSeataAT = config.getBoolean("sharding.transaction.seata.at.enable", true); applicationId = config.getConfig("client.application.id"); transactionServiceGroup = config.getConfig("client.transaction.service.group", "default"); } @Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) { if (enableSeataAT) { initSeataRPCClient(); resourceDataSources.forEach(each -> dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()))); } } private void initSeataRPCClient() { Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file."); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup); } @Override public TransactionType getTransactionType() { return TransactionType.BASE; } @Override public boolean isInTransaction() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); return null != RootContext.getXID(); } @Override public Connection getConnection(final String dataSourceName) throws SQLException { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); return dataSourceMap.get(dataSourceName).getConnection(); } @Override @SneakyThrows(TransactionException.class) public void begin() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate(); globalTransaction.begin(); SeataTransactionHolder.set(globalTransaction); } @Override @SneakyThrows(TransactionException.class) public void commit() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); try { SeataTransactionHolder.get().commit(); } finally { SeataTransactionHolder.clear(); RootContext.unbind(); } } @Override @SneakyThrows(TransactionException.class) public void rollback() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); try { SeataTransactionHolder.get().rollback(); } finally { SeataTransactionHolder.clear(); RootContext.unbind(); } } @Override public void close() { dataSourceMap.clear(); SeataTransactionHolder.clear(); TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy(); } }
Seata有什么不同
只管来看,多了org.apache.shardingsphere.transaction.base.seata.at.TransactionalSQLExecutionHook,看源码是拦截SQL执行的
这一部分都比较依赖原有事务实现机制,并对其做封装,贴下代码
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.transaction.base.seata.at; import com.google.common.base.Preconditions; import io.seata.config.FileConfiguration; import io.seata.core.context.RootContext; import io.seata.core.exception.TransactionException; import io.seata.core.rpc.netty.RmRpcClient; import io.seata.core.rpc.netty.TmRpcClient; import io.seata.rm.RMClient; import io.seata.rm.datasource.DataSourceProxy; import io.seata.tm.TMClient; import io.seata.tm.api.GlobalTransaction; import io.seata.tm.api.GlobalTransactionContext; import lombok.SneakyThrows; import org.apache.shardingsphere.infra.database.type.DatabaseType; import org.apache.shardingsphere.transaction.core.ResourceDataSource; import org.apache.shardingsphere.transaction.core.TransactionType; import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Collection; import java.util.HashMap; import java.util.Map; /** * Seata AT sharding transaction manager. */ public final class SeataATShardingTransactionManager implements ShardingTransactionManager { private final Map<String, DataSource> dataSourceMap = new HashMap<>(); private final String applicationId; private final String transactionServiceGroup; private final boolean enableSeataAT; public SeataATShardingTransactionManager() { FileConfiguration config = new FileConfiguration("seata.conf"); enableSeataAT = config.getBoolean("sharding.transaction.seata.at.enable", true); applicationId = config.getConfig("client.application.id"); transactionServiceGroup = config.getConfig("client.transaction.service.group", "default"); } @Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) { if (enableSeataAT) { initSeataRPCClient(); resourceDataSources.forEach(each -> dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()))); } } private void initSeataRPCClient() { Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file."); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup); } @Override public TransactionType getTransactionType() { return TransactionType.BASE; } @Override public boolean isInTransaction() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); return null != RootContext.getXID(); } @Override public Connection getConnection(final String dataSourceName) throws SQLException { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); return dataSourceMap.get(dataSourceName).getConnection(); } @Override @SneakyThrows(TransactionException.class) public void begin() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate(); globalTransaction.begin(); SeataTransactionHolder.set(globalTransaction); } @Override @SneakyThrows(TransactionException.class) public void commit() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); try { SeataTransactionHolder.get().commit(); } finally { SeataTransactionHolder.clear(); RootContext.unbind(); } } @Override @SneakyThrows(TransactionException.class) public void rollback() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); try { SeataTransactionHolder.get().rollback(); } finally { SeataTransactionHolder.clear(); RootContext.unbind(); } } @Override public void close() { dataSourceMap.clear(); SeataTransactionHolder.clear(); TmRpcClient.getInstance().destroy(); RmRpcClient.getInstance().destroy(); } }