zoukankan      html  css  js  c++  java
  • 一、seata自动配置

    所有文章

    https://www.cnblogs.com/lay2017/p/12485081.html

    正文

    上一篇文章中,展示了springboot如何引入并使用seata来实现分布式事务的,基本使用以后接下来将开始进行源代码的阅读。毕竟阅读源代码总是比阅读文档令人有兴趣一点,而且了解他人的编码思路似乎也算是一个跨时空的交流?

    作为源码篇的开篇, 将会阅读springboot引入seata进行自动配置的部分。

    自动配置类SeataAutoConfiguration

    seata的自动配置类命名非常的直接,就叫做:SeataAutoConfiguration,我们打开这个类

    @ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
    @Configuration
    @EnableConfigurationProperties({SeataProperties.class})
    public class SeataAutoConfiguration {
        
    }

    首先,@Configuration表明,SeataAutoConfiguration被定义为了spring的配置类。

    @ConditionalOnProperty将配置类生效条件设置为seata.enabled=true,默认值是true,所以可以开关分布式事务功能。

    @EnableConfigurationProperties将配置包转成了一个SeataProperties的Bean对象来使用。

    @ComponentScan扫描了一下properties包,加载了一大堆类似SeataProperties的Bean对象。

    接下来阅读SeataAutoConfiguration的内部代码

    @Autowired
    private SeataProperties seataProperties;
    
    @Bean
    public SpringUtils springUtils() {
        return new SpringUtils();
    }
    
    @Bean
    @DependsOn({"springUtils"})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner() {return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup());
    }

    SpringUtils是一个实现了ApplicationContextAware的工具包,可以便捷地从容器当中getBean操作。

    自动配置的核心点落在了下面的一个Bean,GlobalTransactionScanner。

    我们看到构造这个Bean非常的简单,构造方法只需要一个applicationId和txServiceGroup。

    applicationId: 就是spring.application.name=你定义的当前应用的名字,例如:userService

    txServiceGroup: 就是以applicationId 加上 -seata-service-group命名的,例如:userService-seata-service-group。如果版本较低的话,那时候可能还不叫seata而是fescar,因此默认命名就是以fescar为后缀。

    new了一个GlobalTransactionScanner对象,SeataAutoConfiguration这个自动配置类的作用就结束了。有点草率?是的,不过不影响。毕竟SeataAutoConfiguration只是做了一个启动引导的作用。

    GlobalTransactionScanner主体逻辑

    既然核心点落在GlobalTransactionScanner这个类,我们继续关注它。看这个名字其实就可以猜测到一点它的作用,扫描@GlobalTransactional这个注解,并对代理方法进行拦截增强事务的功能。

    要了解这个类,不得不先阅读一下它的UML图

    可以看到,GlobalTransactionScanner主要有4个点值得关注:

    1)Disposable接口,表达了spring容器销毁的时候会进行一些操作

    2)InitializingBean接口,表达了初始化的时候会进行一些操作

    3)AbstractAutoProxyCreator表示它会对spring容器中的Bean进行切面增强,也就是我们上面的拦截事务增强的猜测。

    4)ApplicationContextAware表示可以拿到spring容器

    这里我们稍微关注一下这4个的执行顺序:

    ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean

    我们重点关注一下InitializingBean和AbstractAutoProxyCreator的内容

    InitializingBean

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            return;
        }
        initClient();
    }

    初始化Seata的Client端的东西,Client端主要包括TransactionManager和ResourceManager。或许是为了简化吧,并没有把initClient这件事从GlobalTransactionScanner里面独立出来一个类。

    跟进initClient方法

    private void initClient() {
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(
                "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
        }
        
        //init TM
        TMClient.init(applicationId, txServiceGroup);
       
        //init RM
        RMClient.init(applicationId, txServiceGroup);
      
        registerSpringShutdownHook();
    }

    initClient逻辑并不复杂,单纯调用TMClient.init初始化TransactionManager的RPC客户端,RMClient.init初始化ResourceManager的RPC客户端。seata的RPC采用netty来实现,seata封装简化了一下使用。

    TMClient比较简单,当初初始化RPC组件

    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }

    我们关注一下RMClient的init方法

    public static void init(String applicationId, String transactionServiceGroup) {
        // 获取单例对象
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        // 设置ResourceManager的单例对象
        rmRpcClient.setResourceManager(DefaultResourceManager.get());
        // 添加监听器,监听Server端的消息推送
        rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
        // 初始化RPC
        rmRpcClient.init();
    }

    和TMClient想比,RMClient多出了一个监听Server端消息并处理的机制。也就是说TM的职责更多的是主动与Server端通信,比如:全局事务的begin、commit、rollback等。

    而RM除了主动操作本地资源外,还会因为全局事务的commit、rollback等的消息推送,从而对本地资源进行相关操作。

    AbstractAutoProxyCreator

    GlobalTransactionScanner初始化完了TM和RM以后,我们再关注一下AbstractAutoProxyCreator,自动代理。

    自动代理,它代理啥东西呢?或者说它给spring中的Bean增强了什么功能?

    GlobalTransactionScanner主要扩展了AbstractAutoProxyCreator的两个方法

    1)wrapIfNecessary:代理增强的前置判断处理,表示是否该Bean需要增强,如果增强的话创建代理类

    2)postProcessAfterInitialization:当一个spring的Bean已经初始化完毕的时候,后置处理一些东西

    wrapIfNecessary前置处理

    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                // 相同Bean排重
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
    
                interceptor = null;
                // 判断是否开启TCC模式
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    // TCC实现的拦截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    
                    // 判断是否存在@GlobalTransactional或者@GlobalLock注解
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
    
                    if (interceptor == null) {
                        // 非TCC的拦截器
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);
                    }
                }
                // 判断当前Bean是否已经是spring的代理类了
                if (!AopUtils.isAopProxy(bean)) {
                    // 如果还不是,那么走一轮spring的代理过程即可
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 如果是一个spring的代理类,那么反射获取代理类中已经存在的拦截器集合,然后添加到该集合当中
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
    
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {}
    }

    wrapIfNecessary方法较长我们分步骤看看

    1)isTccAutoProxy判断是否开启tcc模式,开启的话选择了TccActionInterceptor拦截器,非tcc模式选择GlobalTransactionalInterceptor拦截器,默认不开启

    2)existAnnotation判断当前Bean是否有类或者接口的方法存在@GlobalTransactional或者@GlobalLock注解,如果没有则直接返回

    3)isAopProxy方法是判断当前的Bean是否已经是spring的代理类了,无论是JDK动态代理还是Cglib类代理。如果是普通的Bean,走原有的生成代理逻辑即可,如果已经是代理类,那么要通过反射获取代理对象内的拦截器集合也叫做Advisor,直接添加到该集合当中。

    wrapIfNecessary的方法并不复杂,但是如果对代理不是很熟悉或许对细节点会有些困惑。

    postProcessAfterInitialization数据源代理

    wrapIfNecessary创建了代理类,最后看看后置处理又做了啥。跟进该方法

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 判断是否数据源,且不是数据源代理
        if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
            // 创建静态代理
            DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
            Class<?>[] interfaces = SpringProxyUtils.getAllInterfaces(bean);
            // 创建动态代理类
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                    if (null != m) {
                        // 如果静态代理存在目标对象的代理方法,那么调用该代理方法,从而调用目标方法
                        return m.invoke(dataSourceProxy, args);
                    } else {
                        // 如果静态代理不存在目标对象的代理方法,那么直接调用目标对象的代理方法
                        boolean oldAccessible = method.isAccessible();
                        try {
                            method.setAccessible(true);
                            return method.invoke(bean, args);
                        } finally {
                            //recover the original accessible for security reason
                            method.setAccessible(oldAccessible);
                        }
                    }
                }
            });
        }
        // 不需要处理数据源代理的,按照原有逻辑处理
        return super.postProcessAfterInitialization(bean, beanName);
    }

    这里有两大块逻辑:

    1)是数据源,且需要进行数据源代理的,那么特立独行地走if内的逻辑

    2)不需要数据源代理的,那么走原有逻辑

    我们关注一下数据源自代理的逻辑,数据源代理或许是seata非常重要的实现之一

    首先,DataSource是一个接口,DataSourceProxy对DataSource是一种实现的关系。但是,请注意!!!spring中的数据源的Bean和DataSource是实现关系,可是该Bean和DataSourceProxy并不是继承或者实现的关系,而是组合关系。

    因此,DataSourceProxy作为数据源Bean的静态代理而存在,而它实现了DataSource的接口,但是很有可能并未实现Bean的一些接口。

    这样一来,就很有必要创建一个动态代理类,来关联一下DataSourceProxy和Bean之间的关系了。如果DataSourceProxy代理过的方法,那么调用代理方法,如果没有就直接调用Bean中的方法。

    后置处理的大体逻辑就是这样,基本上就是对数据源进行自动代理处理。

    总结

    本文主要是自动配置的主体逻辑,自动配置围绕着GlobalTransactionScanner这个Bean展开。核心逻辑主要是三块:

    1)初始化TransactionManager和ResourceManager的RPC客户端

    2) 对@GlobalTransactional和@GlobalLock注解的方法进行增强

    3)数据源代理

  • 相关阅读:
    TCP链接异常断开后,对端仍然ESTABLISH
    mds0: Many clients (191) failing to respond to cache pressure
    Linux C/C++编译过程中的各种not declared in this scope
    Centos7添加磁盘并分区格式化
    Windows Terminal——安装并配置主题
    Kbone——创建项目报错 name can no longer contain capital letters
    Redis——大批量删除redis的key
    Redis——设置最大内存 | key淘汰机制
    Nightingale——滴滴夜莺部署【一】
    ELK——使用Docker快速搭建
  • 原文地址:https://www.cnblogs.com/lay2017/p/12434848.html
Copyright © 2011-2022 走看看