zoukankan      html  css  js  c++  java
  • 人人快速开发平台 renren-fast 源码分析--多数据源

    项目中可以根据注解声明的数据库,在特定的方法中切换数据源。下面就看看它是怎么做到的(尽管我不知道这么做有什么用)
    首先看 Springboot 的 config 类有没有数据源相关的,从RenrenApplication找一下子就找到

    /**

     * 配置多数据源
     * @author chenshun
     * @email sunlightcs@gmail.com
     * @date 2017/8/19 0:41
     */
    @Configuration
    public class DynamicDataSourceConfig {
    
        @Bean
        @ConfigurationProperties("spring.datasource.druid.first")
        public DataSource firstDataSource(){
            return DruidDataSourceBuilder.create().build();
        }
    
        @Bean
        @ConfigurationProperties("spring.datasource.druid.second")
        public DataSource secondDataSource(){
            return DruidDataSourceBuilder.create().build();
        }
    
        @Bean
        @Primary
        public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
            Map<Object, Object> targetDataSources = new HashMap<>();
            targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
            targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
            return new DynamicDataSource(firstDataSource, targetDataSources);
        }
    }

    根据数据源的配置,找到datasources包有个DynamicDataSource

    /**
     * 动态数据源
     * @author chenshun
     * @email sunlightcs@gmail.com
     * @date 2017/8/19 1:03
     */
    public class DynamicDataSource extends AbstractRoutingDataSource {
        private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
        public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
            super.setDefaultTargetDataSource(defaultTargetDataSource);
            super.setTargetDataSources(targetDataSources);
            super.afterPropertiesSet();
        }
    
        @Override
        protected Object determineCurrentLookupKey() {
            return getDataSource();
        }
    
        public static void setDataSource(String dataSource) {
            contextHolder.set(dataSource);
        }
    
        public static String getDataSource() {
            return contextHolder.get();
        }
    
        public static void clearDataSource() {
            contextHolder.remove();
        }
    
    }
    光是看这个类其实看不出什么东西,只知道有个ThreadLocal常量,并且可以用静态方法配置它。那么我们看看父类AbstractRoutingDataSource
    /**
     * Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()}
     * calls to one of various target DataSources based on a lookup key. The latter is usually
     * (but not necessarily) determined through some thread-bound transaction context.
     *
     * @author Juergen Hoeller
     * @since 2.0.1
     * @see #setTargetDataSources
     * @see #setDefaultTargetDataSource
     * @see #determineCurrentLookupKey()
     */
    public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
    
        @Nullable
        private Map<Object, Object> targetDataSources;
    
        @Nullable
        private Object defaultTargetDataSource;
    
        private boolean lenientFallback = true;
    
        private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
    
        @Nullable
        private Map<Object, DataSource> resolvedDataSources;
    
        @Nullable
        private DataSource resolvedDefaultDataSource;
    
    
        /**
         * Specify the map of target DataSources, with the lookup key as key.
         * The mapped value can either be a corresponding {@link javax.sql.DataSource}
         * instance or a data source name String (to be resolved via a
         * {@link #setDataSourceLookup DataSourceLookup}).
         * <p>The key can be of arbitrary type; this class implements the
         * generic lookup process only. The concrete key representation will
         * be handled by {@link #resolveSpecifiedLookupKey(Object)} and
         * {@link #determineCurrentLookupKey()}.
         */
        public void setTargetDataSources(Map<Object, Object> targetDataSources) {
            this.targetDataSources = targetDataSources;
        }
    
        /**
         * Specify the default target DataSource, if any.
         * <p>The mapped value can either be a corresponding {@link javax.sql.DataSource}
         * instance or a data source name String (to be resolved via a
         * {@link #setDataSourceLookup DataSourceLookup}).
         * <p>This DataSource will be used as target if none of the keyed
         * {@link #setTargetDataSources targetDataSources} match the
         * {@link #determineCurrentLookupKey()} current lookup key.
         */
        public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
            this.defaultTargetDataSource = defaultTargetDataSource;
        }
    
        /**
         * Specify whether to apply a lenient fallback to the default DataSource
         * if no specific DataSource could be found for the current lookup key.
         * <p>Default is "true", accepting lookup keys without a corresponding entry
         * in the target DataSource map - simply falling back to the default DataSource
         * in that case.
         * <p>Switch this flag to "false" if you would prefer the fallback to only apply
         * if the lookup key was {@code null}. Lookup keys without a DataSource
         * entry will then lead to an IllegalStateException.
         * @see #setTargetDataSources
         * @see #setDefaultTargetDataSource
         * @see #determineCurrentLookupKey()
         */
        public void setLenientFallback(boolean lenientFallback) {
            this.lenientFallback = lenientFallback;
        }
    
        /**
         * Set the DataSourceLookup implementation to use for resolving data source
         * name Strings in the {@link #setTargetDataSources targetDataSources} map.
         * <p>Default is a {@link JndiDataSourceLookup}, allowing the JNDI names
         * of application server DataSources to be specified directly.
         */
        public void setDataSourceLookup(@Nullable DataSourceLookup dataSourceLookup) {
            this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());
        }
    
    
        @Override
        public void afterPropertiesSet() {
            if (this.targetDataSources == null) {
                throw new IllegalArgumentException("Property 'targetDataSources' is required");
            }
            this.resolvedDataSources = new HashMap<>(this.targetDataSources.size());
            this.targetDataSources.forEach((key, value) -> {
                Object lookupKey = resolveSpecifiedLookupKey(key);
                DataSource dataSource = resolveSpecifiedDataSource(value);
                this.resolvedDataSources.put(lookupKey, dataSource);
            });
            if (this.defaultTargetDataSource != null) {
                this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
            }
        }
    
        /**
         * Resolve the given lookup key object, as specified in the
         * {@link #setTargetDataSources targetDataSources} map, into
         * the actual lookup key to be used for matching with the
         * {@link #determineCurrentLookupKey() current lookup key}.
         * <p>The default implementation simply returns the given key as-is.
         * @param lookupKey the lookup key object as specified by the user
         * @return the lookup key as needed for matching
         */
        protected Object resolveSpecifiedLookupKey(Object lookupKey) {
            return lookupKey;
        }
    
        /**
         * Resolve the specified data source object into a DataSource instance.
         * <p>The default implementation handles DataSource instances and data source
         * names (to be resolved via a {@link #setDataSourceLookup DataSourceLookup}).
         * @param dataSource the data source value object as specified in the
         * {@link #setTargetDataSources targetDataSources} map
         * @return the resolved DataSource (never {@code null})
         * @throws IllegalArgumentException in case of an unsupported value type
         */
        protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
            if (dataSource instanceof DataSource) {
                return (DataSource) dataSource;
            }
            else if (dataSource instanceof String) {
                return this.dataSourceLookup.getDataSource((String) dataSource);
            }
            else {
                throw new IllegalArgumentException(
                        "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
            }
        }
    
    
        @Override
        public Connection getConnection() throws SQLException {
            return determineTargetDataSource().getConnection();
        }
    
        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return determineTargetDataSource().getConnection(username, password);
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T unwrap(Class<T> iface) throws SQLException {
            if (iface.isInstance(this)) {
                return (T) this;
            }
            return determineTargetDataSource().unwrap(iface);
        }
    
        @Override
        public boolean isWrapperFor(Class<?> iface) throws SQLException {
            return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
        }
    
        /**
         * Retrieve the current target DataSource. Determines the
         * {@link #determineCurrentLookupKey() current lookup key}, performs
         * a lookup in the {@link #setTargetDataSources targetDataSources} map,
         * falls back to the specified
         * {@link #setDefaultTargetDataSource default target DataSource} if necessary.
         * @see #determineCurrentLookupKey()
         */
        protected DataSource determineTargetDataSource() {
            Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
            Object lookupKey = determineCurrentLookupKey();
            DataSource dataSource = this.resolvedDataSources.get(lookupKey);
            if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
                dataSource = this.resolvedDefaultDataSource;
            }
            if (dataSource == null) {
                throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
            }
            return dataSource;
        }
    
        /**
         * Determine the current lookup key. This will typically be
         * implemented to check a thread-bound transaction context.
         * <p>Allows for arbitrary keys. The returned key needs
         * to match the stored lookup key type, as resolved by the
         * {@link #resolveSpecifiedLookupKey} method.
         */
        @Nullable
        protected abstract Object determineCurrentLookupKey();
    
    }

    根据注释可以看出,这个类重写了getConnection()方法。而getConnection()方法实现的逻辑就是从determineCurrentLookupKey()中获取数据源,然后打开连接。结合自己实现的DynamicDataSource我们不难得出一个结论:项目是根据ThreadLocal常量来获取数据库连接的。这个我们后面再证实,先来看看项目中是什么时候切换数据库的。

    datasources包下还有个aspect包,一看就让人觉得是通过 aop 来切换数据源的了。直接来看代码
    DataSourceAspect

    /**

     * 多数据源,切面处理类
     * @author chenshun
     * @email sunlightcs@gmail.com
     * @date 2017/9/16 22:20
     */
    @Aspect
    @Component
    public class DataSourceAspect implements Ordered {
        protected Logger logger = LoggerFactory.getLogger(getClass());
    
        @Pointcut("@annotation(io.renren.datasources.annotation.DataSource)")
        public void dataSourcePointCut() {
    
        }
    
        @Around("dataSourcePointCut()")
        public Object around(ProceedingJoinPoint point) throws Throwable {
            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();
    
            DataSource ds = method.getAnnotation(DataSource.class);
            if(ds == null){
                DynamicDataSource.setDataSource(DataSourceNames.FIRST);
                logger.debug("set datasource is " + DataSourceNames.FIRST);
            }else {
                DynamicDataSource.setDataSource(ds.name());
                logger.debug("set datasource is " + ds.name());
            }
    
            try {
                return point.proceed();
            } finally {
                DynamicDataSource.clearDataSource();
                logger.debug("clean datasource");
            }
        }
    
        @Override
        public int getOrder() {
            return 1;
        }
    }

    果不其然,是根据 aop 来取出调用的方法的@DataSource中的数据源字符串值,决定用什么数据源的。结合上面的代码,可以得出结论:是在调用有@DataSource修饰的方法时,修改的当前线程数据源。印象中 Servlet 默认是一个请求一条线程处理,所以是这个请求内的数据源都改变了。

    全局搜索一下@DataSource,发现只有一个测试的 service 有用到

    /**
     * 测试多数据源
     *
     * @author Mark sunlightcs@gmail.com
     * @since 3.1.0 2018-01-28
     */
    @Service
    public class DataSourceTestService {
        @Autowired
        private SysUserService sysUserService;
    
        public SysUserEntity queryUser(Long userId){
            return sysUserService.selectById(userId);
        }
    
        @DataSource(name = DataSourceNames.SECOND)
        public SysUserEntity queryUser2(Long userId){
            return sysUserService.selectById(userId);
        }
    }

    那么切换数据源这部分,在项目源码中就到此为止了。

    我们探究一下在 Mybatis 中是什么时候根据数据源来创建连接的。

    首先我们要知道 Mybatis 的工作流程。这个读一读官网就知道,Mybatis-Spring 先通过SqlSessionFactoryBean创建SqlSessionFactory实例,然后在适当的时候创建SqlSession,创建SqlSession的时候应该已经打开数据库连接了,因为要管理事务的。所以我们其实可以直接找到SqlSessionFactory的实现类:DefaultSqlSessionFactory

    public class DefaultSqlSessionFactory implements SqlSessionFactory {
    
      private final Configuration configuration;
    
      public DefaultSqlSessionFactory(Configuration configuration) {
        this.configuration = configuration;
      }
    
      @Override
      public SqlSession openSession() {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false);
      }
    
      @Override
      public SqlSession openSession(boolean autoCommit) {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, autoCommit);
      }
    
      @Override
      public SqlSession openSession(ExecutorType execType) {
        return openSessionFromDataSource(execType, null, false);
      }
    
      @Override
      public SqlSession openSession(TransactionIsolationLevel level) {
        return openSessionFromDataSource(configuration.getDefaultExecutorType(), level, false);
      }
    
      @Override
      public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) {
        return openSessionFromDataSource(execType, level, false);
      }
    
      @Override
      public SqlSession openSession(ExecutorType execType, boolean autoCommit) {
        return openSessionFromDataSource(execType, null, autoCommit);
      }
    
      @Override
      public SqlSession openSession(Connection connection) {
        return openSessionFromConnection(configuration.getDefaultExecutorType(), connection);
      }
    
      @Override
      public SqlSession openSession(ExecutorType execType, Connection connection) {
        return openSessionFromConnection(execType, connection);
      }
    
      @Override
      public Configuration getConfiguration() {
        return configuration;
      }
    
      private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        Transaction tx = null;
        try {
          final Environment environment = configuration.getEnvironment();
          final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
          tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
          final Executor executor = configuration.newExecutor(tx, execType);
          return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
          closeTransaction(tx); // may have fetched a connection so lets call close()
          throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }
    
      private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
        try {
          boolean autoCommit;
          try {
            autoCommit = connection.getAutoCommit();
          } catch (SQLException e) {
            // Failover to true, as most poor drivers
            // or databases won't support transactions
            autoCommit = true;
          }      
          final Environment environment = configuration.getEnvironment();
          final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
          final Transaction tx = transactionFactory.newTransaction(connection);
          final Executor executor = configuration.newExecutor(tx, execType);
          return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
          throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }
    
      private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) {
        if (environment == null || environment.getTransactionFactory() == null) {
          return new ManagedTransactionFactory();
        }
        return environment.getTransactionFactory();
      }
    
      private void closeTransaction(Transaction tx) {
        if (tx != null) {
          try {
            tx.close();
          } catch (SQLException ignore) {
            // Intentionally ignore. Prefer previous error.
          }
        }
      }
    
    }

    从这个 factory 的源码可以读出,创建SqlSession调用的是openSessionFromDataSource()方法。在这个方法中:

    1. 先是从configuration中拿到enviroment,熟悉 Mybatis 的朋友都知道我们配置数据源就是在enviroment
    2. 然后调用getTransactionFactoryFromEnvironment()创建了TransactionFactory实例
    3. 根据TransactionFactory开启了事务tx
    4. 根据这个事务tx创建了执行类executor
    5. 最后创建了一个DefaultSqlSession,也就是我们用来做对数据库查询操作的类。

    一步步来看是什么时候打开的连接吧

    
    

    创建 TransactionFactory 实例

    直接看getTransactionFactoryFromEnvironment()方法源码。

    private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) {
        if (environment == null || environment.getTransactionFactory() == null) {
          return new ManagedTransactionFactory();
        }
        return environment.getTransactionFactory();
      }
    如果没有配置 transactionFactory,那么返回ManagedTransactionFactory
    ManagedTransactionFactory.java
    public class ManagedTransactionFactory implements TransactionFactory {
    
      private boolean closeConnection = true;
    
      @Override
      public void setProperties(Properties props) {
        if (props != null) {
          String closeConnectionProperty = props.getProperty("closeConnection");
          if (closeConnectionProperty != null) {
            closeConnection = Boolean.valueOf(closeConnectionProperty);
          }
        }
      }
    
      @Override
      public Transaction newTransaction(Connection conn) {
        return new ManagedTransaction(conn, closeConnection);
      }
    
      @Override
      public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
        // Silently ignores autocommit and isolation level, as managed transactions are entirely
        // controlled by an external manager.  It's silently ignored so that
        // code remains portable between managed and unmanaged configurations.
        return new ManagedTransaction(ds, level, closeConnection);
      }
    }
    
    

    根据 TransactionFactory 开启了事务

    ManagedTransaction.java

    /**
     * {@link Transaction} that lets the container manage the full lifecycle of the transaction.
     * Delays connection retrieval until getConnection() is called.
     * Ignores all commit or rollback requests.
     * By default, it closes the connection but can be configured not to do it.
     *
     * @author Clinton Begin
     *
     * @see ManagedTransactionFactory
     */
    public class ManagedTransaction implements Transaction {
    
      private static final Log log = LogFactory.getLog(ManagedTransaction.class);
    
      private DataSource dataSource;
      private TransactionIsolationLevel level;
      private Connection connection;
      private final boolean closeConnection;
    
      public ManagedTransaction(Connection connection, boolean closeConnection) {
        this.connection = connection;
        this.closeConnection = closeConnection;
      }
    
      public ManagedTransaction(DataSource ds, TransactionIsolationLevel level, boolean closeConnection) {
        this.dataSource = ds;
        this.level = level;
        this.closeConnection = closeConnection;
      }
    
      @Override
      public Connection getConnection() throws SQLException {
        if (this.connection == null) {
          openConnection();
        }
        return this.connection;
      }
    
      @Override
      public void commit() throws SQLException {
        // Does nothing
      }
    
      @Override
      public void rollback() throws SQLException {
        // Does nothing
      }
    
      @Override
      public void close() throws SQLException {
        if (this.closeConnection && this.connection != null) {
          if (log.isDebugEnabled()) {
            log.debug("Closing JDBC Connection [" + this.connection + "]");
          }
          this.connection.close();
        }
      }
    
      protected void openConnection() throws SQLException {
        if (log.isDebugEnabled()) {
          log.debug("Opening JDBC Connection");
        }
        this.connection = this.dataSource.getConnection();
        if (this.level != null) {
          this.connection.setTransactionIsolation(this.level.getLevel());
        }
      }
    
      @Override
      public Integer getTimeout() throws SQLException {
        return null;
      }
    
    }
    这里有个openConnection()方法根据dataSource打开了数据库连接。这跟我们最初自己配的动态数据源连接上了,接下来就看何时调用了。

    根据这个事务创建了执行类 executor

    public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
        executorType = executorType == null ? defaultExecutorType : executorType;
        executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
        Executor executor;
        if (ExecutorType.BATCH == executorType) {
          executor = new BatchExecutor(this, transaction);
        } else if (ExecutorType.REUSE == executorType) {
          executor = new ReuseExecutor(this, transaction);
        } else {
          executor = new SimpleExecutor(this, transaction);
        }
        if (cacheEnabled) {
          executor = new CachingExecutor(executor);
        }
        executor = (Executor) interceptorChain.pluginAll(executor);
        return executor;
      }
    可以看出这个executor还用interceptorChain添加了 plugins,这跟常用的分页插件也有关,这里就不多说了。
    
    

    创建 DefaultSqlSession

    这个类代码太多了,我就仅贴出关键部分

    @Override
      public Connection getConnection() {
        try {
          return executor.getTransaction().getConnection();
        } catch (SQLException e) {
          throw ExceptionFactory.wrapException("Error getting a new connection.  Cause: " + e, e);
        }
      }
    
    

    这里调用了我们之前创建的事务类的getConnection()方法,在那个方法中打开了我们自定义的数据源的连接。而在我们自定义的DynamicDataSource中,getConnection()又是根据ThreadLocal常量来获取数据源的,所以只要当前请求的线程中被自定义切面类修改了数据源,那么等到这个线程要用 Mybatis打开数据源的连接的时候,就会打开切换过后的数据源的连接了。

    本篇文章内容也不多,不过根据上述的 Mybatis 的工作流程来读它的源码,很容易就可以读出其他功能是怎么实现的,比如说 Mybatis-Spring 中的注入 Mapper,实际上是通过 Mybatis 的动态代理,解析调用的 Mapper 接口的方法对象,获取注解、方法名、参数等等信息,再用SqlSession来调用。

    来源:简书 链接:https://www.jianshu.com/p/6007d5777cf2

  • 相关阅读:
    jvm gc 线程
    高分辨率图像建筑物提取数据集制作
    Ubuntu 更改软件源
    后台程序员简单应用前端的bootstrap(小白)
    php--常见算法3
    php--常见算法2
    php--常见算法1
    php三种排序算法
    Django学习之十二:Cache 缓存组件
    Restframe_work 回顾记忆集
  • 原文地址:https://www.cnblogs.com/IT-TOP/p/java.html
Copyright © 2011-2022 走看看