所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在上一篇文章中,展示了springboot如何引入并使用seata来实现分布式事务的,基本使用以后接下来将开始进行源代码的阅读。毕竟阅读源代码总是比阅读文档令人有兴趣一点,而且了解他人的编码思路似乎也算是一个跨时空的交流?
作为源码篇的开篇, 将会阅读springboot引入seata进行自动配置的部分。
自动配置类SeataAutoConfiguration
seata的自动配置类命名非常的直接,就叫做:SeataAutoConfiguration,我们打开这个类
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties") @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @Configuration @EnableConfigurationProperties({SeataProperties.class}) public class SeataAutoConfiguration { }
首先,@Configuration表明,SeataAutoConfiguration被定义为了spring的配置类。
@ConditionalOnProperty将配置类生效条件设置为seata.enabled=true,默认值是true,所以可以开关分布式事务功能。
@EnableConfigurationProperties将配置包转成了一个SeataProperties的Bean对象来使用。
@ComponentScan扫描了一下properties包,加载了一大堆类似SeataProperties的Bean对象。
接下来阅读SeataAutoConfiguration的内部代码
@Autowired private SeataProperties seataProperties; @Bean public SpringUtils springUtils() { return new SpringUtils(); } @Bean @DependsOn({"springUtils"}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup()); }
SpringUtils是一个实现了ApplicationContextAware的工具包,可以便捷地从容器当中getBean操作。
自动配置的核心点落在了下面的一个Bean,GlobalTransactionScanner。
我们看到构造这个Bean非常的简单,构造方法只需要一个applicationId和txServiceGroup。
applicationId: 就是spring.application.name=你定义的当前应用的名字,例如:userService
txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本较低的话,那时候可能还不叫seata而是fescar,因此默认命名就是以fescar为后缀。
new了一个GlobalTransactionScanner对象,SeataAutoConfiguration这个自动配置类的作用就结束了。有点草率?是的,不过不影响。毕竟SeataAutoConfiguration只是做了一个启动引导的作用。
GlobalTransactionScanner主体逻辑
既然核心点落在GlobalTransactionScanner这个类,我们继续关注它。看这个名字其实就可以猜测到一点它的作用,扫描@GlobalTransactional这个注解,并对代理方法进行拦截增强事务的功能。
要了解这个类,不得不先阅读一下它的UML图
可以看到,GlobalTransactionScanner主要有4个点值得关注:
1)Disposable接口,表达了spring容器销毁的时候会进行一些操作
2)InitializingBean接口,表达了初始化的时候会进行一些操作
3)AbstractAutoProxyCreator表示它会对spring容器中的Bean进行切面增强,也就是我们上面的拦截事务增强的猜测。
4)ApplicationContextAware表示可以拿到spring容器
这里我们稍微关注一下这4个的执行顺序:
ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean
我们重点关注一下InitializingBean和AbstractAutoProxyCreator的内容
InitializingBean
@Override public void afterPropertiesSet() { if (disableGlobalTransaction) { return; } initClient(); }
初始化Seata的Client端的东西,Client端主要包括TransactionManager和ResourceManager。或许是为了简化吧,并没有把initClient这件事从GlobalTransactionScanner里面独立出来一个类。
跟进initClient方法
private void initClient() { if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException( "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup); } //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook(); }
initClient逻辑并不复杂,单纯调用TMClient.init初始化TransactionManager的RPC客户端,RMClient.init初始化ResourceManager的RPC客户端。seata的RPC采用netty来实现,seata封装简化了一下使用。
TMClient比较简单,当初初始化RPC组件
public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); tmRpcClient.init(); }
我们关注一下RMClient的init方法
public static void init(String applicationId, String transactionServiceGroup) { // 获取单例对象 RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); // 设置ResourceManager的单例对象 rmRpcClient.setResourceManager(DefaultResourceManager.get()); // 添加监听器,监听Server端的消息推送 rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); // 初始化RPC rmRpcClient.init(); }
和TMClient想比,RMClient多出了一个监听Server端消息并处理的机制。也就是说TM的职责更多的是主动与Server端通信,比如:全局事务的begin、commit、rollback等。
而RM除了主动操作本地资源外,还会因为全局事务的commit、rollback等的消息推送,从而对本地资源进行相关操作。
AbstractAutoProxyCreator
GlobalTransactionScanner初始化完了TM和RM以后,我们再关注一下AbstractAutoProxyCreator,自动代理。
自动代理,它代理啥东西呢?或者说它给spring中的Bean增强了什么功能?
GlobalTransactionScanner主要扩展了AbstractAutoProxyCreator的两个方法
1)wrapIfNecessary:代理增强的前置判断处理,表示是否该Bean需要增强,如果增强的话创建代理类
2)postProcessAfterInitialization:当一个spring的Bean已经初始化完毕的时候,后置处理一些东西
wrapIfNecessary前置处理
@Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { try { synchronized (PROXYED_SET) { // 相同Bean排重 if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; // 判断是否开启TCC模式 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // TCC实现的拦截器 interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // 判断是否存在@GlobalTransactional或者@GlobalLock注解 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (interceptor == null) { // 非TCC的拦截器 interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor); } } // 判断当前Bean是否已经是spring的代理类了 if (!AopUtils.isAopProxy(bean)) { // 如果还不是,那么走一轮spring的代理过程即可 bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { // 如果是一个spring的代理类,那么反射获取代理类中已经存在的拦截器集合,然后添加到该集合当中 AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) {} }
wrapIfNecessary方法较长我们分步骤看看
1)isTccAutoProxy判断是否开启tcc模式,开启的话选择了TccActionInterceptor拦截器,非tcc模式选择GlobalTransactionalInterceptor拦截器,默认不开启
2)existAnnotation判断当前Bean是否有类或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果没有则直接返回
3)isAopProxy方法是判断当前的Bean是否已经是spring的代理类了,无论是JDK动态代理还是Cglib类代理。如果是普通的Bean,走原有的生成代理逻辑即可,如果已经是代理类,那么要通过反射获取代理对象内的拦截器集合也叫做Advisor,直接添加到该集合当中。
wrapIfNecessary的方法并不复杂,但是如果对代理不是很熟悉或许对细节点会有些困惑。
postProcessAfterInitialization数据源代理
wrapIfNecessary创建了代理类,最后看看后置处理又做了啥。跟进该方法
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 判断是否数据源,且不是数据源代理 if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) { // 创建静态代理 DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean); Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean); // 创建动态代理类 return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes()); if (null != m) { // 如果静态代理存在目标对象的代理方法,那么调用该代理方法,从而调用目标方法 return m.invoke(dataSourceProxy, args); } else { // 如果静态代理不存在目标对象的代理方法,那么直接调用目标对象的代理方法 boolean oldAccessible = method.isAccessible(); try { method.setAccessible(true); return method.invoke(bean, args); } finally { //recover the original accessible for security reason method.setAccessible(oldAccessible); } } } }); } // 不需要处理数据源代理的,按照原有逻辑处理 return super.postProcessAfterInitialization(bean, beanName); }
这里有两大块逻辑:
1)是数据源,且需要进行数据源代理的,那么特立独行地走if内的逻辑
2)不需要数据源代理的,那么走原有逻辑
我们关注一下数据源自代理的逻辑,数据源代理或许是seata非常重要的实现之一
首先,DataSource是一个接口,DataSourceProxy对DataSource是一种实现的关系。但是,请注意!!!spring中的数据源的Bean和DataSource是实现关系,可是该Bean和DataSourceProxy并不是继承或者实现的关系,而是组合关系。
因此,DataSourceProxy作为数据源Bean的静态代理而存在,而它实现了DataSource的接口,但是很有可能并未实现Bean的一些接口。
这样一来,就很有必要创建一个动态代理类,来关联一下DataSourceProxy和Bean之间的关系了。如果DataSourceProxy代理过的方法,那么调用代理方法,如果没有就直接调用Bean中的方法。
后置处理的大体逻辑就是这样,基本上就是对数据源进行自动代理处理。
总结
本文主要是自动配置的主体逻辑,自动配置围绕着GlobalTransactionScanner这个Bean展开。核心逻辑主要是三块:
1)初始化TransactionManager和ResourceManager的RPC客户端
2) 对@GlobalTransactional和@GlobalLock注解的方法进行增强
3)数据源代理