zoukankan      html  css  js  c++  java
  • 基于Spring读写分离

    为什么是基于Spring的呢,因为实现方案基于Spring的事务以及AbstractRoutingDataSource(spring中的一个基础类,可以在其中放多个数据源,然后根据一些规则来确定当前需要使用哪个数据,既可以进行读写分离,也可以用来做分库分表)

    我们只需要实现

    
     determineCurrentLookupKey()
    
    

    每次生成jdbc connection时,都会先调用该方法来甄选出实际需要使用的datasource,由于这个方法并没有参数,因此,最佳的方式是基于ThreadLocal变量

    
     @Component
    @Primary
    public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    
        @Resource(name = "masterDs")
        private DataSource masterDs;
    
        @Resource(name = "slaveDs")
        private DataSource slaveDs;
    
        @Override
        public void afterPropertiesSet() {
            Map<Object, Object> targetDataSources = new HashMap<>();
            targetDataSources.put("slaveDs", slaveDs);
            targetDataSources.put("masterDs", masterDs);
            this.setTargetDataSources(targetDataSources);
            this.setDefaultTargetDataSource(masterDs);
            super.afterPropertiesSet();
        }
    
        @Override
        protected Object determineCurrentLookupKey() {
            return DynamicDataSourceHolder.contextHolder.get();
        }
    
    
        /**
         * 持有当前线程所有数据源的程序
         */
        public static class DynamicDataSourceHolder {
            public static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
            public static void setWrite() {
                contextHolder.set("masterDs");
            }
    
            public static void setRead() {
                contextHolder.set("slaveDs");
            }
    
            public static void remove() {
                contextHolder.remove();
            }
    
        }
    }
    
    

    DynamicRoutingDataSource 仅仅是一个DataSource实现,更高层的类主要用它来创建连接,如getConnection(),getConnection会先寻找实际的datasource,在创建连接

    
    	@Override
    	public Connection getConnection() throws SQLException {
    		return determineTargetDataSource().getConnection();
    	}
    
    

    以上代码,使用了ThreadLocal contextHolder来存取当前需要的datasource的loopkey
    contextHolder上的值可能没有被初始化,此时contextHolder.get() 等于 null,此时会使用默认的datasource,我们设置的默认值对应的是主库

      
    	/**
    	 * Retrieve the current target DataSource. Determines the
    	 * {@link #determineCurrentLookupKey() current lookup key}, performs
    	 * a lookup in the {@link #setTargetDataSources targetDataSources} map,
    	 * falls back to the specified
    	 * {@link #setDefaultTargetDataSource default target DataSource} if necessary.
    	 * @see #determineCurrentLookupKey()
    	 */
    	protected DataSource determineTargetDataSource() {
    		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
    		Object lookupKey = determineCurrentLookupKey();
    		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
    		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
    			dataSource = this.resolvedDefaultDataSource;
    		}
    		if (dataSource == null) {
    			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
    		}
    		return dataSource;
    	}
    
    

    下一步,是在sql请求前,先对contextHolder赋值,手动赋值工作量很大,并且容易出错,也没有办法规范

    实现方案是基于Aop,根据方法名,或者方法注解来区分

    • 根据方法名的好处比较明显,只要工程的managerdao层方法名称规范就行,不用到处添加注解,缺点是不精准
    • 根据注解来区分,缺点是需要在很多个类或接口上加注解,优点是精准

    如果有特殊的业务,可以两种情况都使用,如以下场景:

    • 一个大的service,先后调用多个manager层dao层sql,先insert、后query,并且想直接查主库,也就是写后立即查,由于mysql主从同步可能会有延迟,写后立即查可能会读到老数据,写后立即查的情况比较复杂,如果不是事务的话,实现其实比较复杂,如何在非事务场景下,让两个顺序执行的请求,保持同一个connection,需单独调研,根据实际情况进行修改

    我们将设置contextHolder的地方加在了dao层,出于以下考量:

    • 透过manager层,直接调用dao时,无风险
    • 当前工程采用的是手动在manager层开启事务,开启事务lookupKey一定为null,采用默认的masterDs,没有问题,事务开启链接后,dao层的每个被调用的方法,会使用事务中的链接(由Spring Transaction控制)
    • 如果在manager层开启事务,manager层的方法名可能不规范,dao层是最接近sql请求的地方,规范更容易遵循

    当然,这不是最佳实践,如果在manager层做这个事,也是可以的,看具体的情况,要求是,名称或注解表意为query的方法,里面不能做任何更新操作,因为manager层已经确定了它会查从库,以下方法是会执行失败的

    
    public class Manager1{
    
      ......
      public List getSome(params){
           dao1.getRecord1(params);
           dao2.updateLog(params);
      }
    
    }
    
    

    因为dao2是一个更新请求,使用从库进行更新,肯定是会失败的
    (使用Spring Boot,或Spring时,需确保开启AspectJ,Spring Boot开启方式 @EnableAspectJAutoProxy(proxyTargetClass = true)

    aop示例

     @Aspect
    @Order(-10)
    @Component
    public class DataSourceAspect {
    
        public static final Logger logger = LoggerFactory.getLogger(DataSourceAspect.class);
    
        private static final String[] DefaultSlaveMethodStart
                = new String[]{"query", "find", "get", "select", "count", "list"};
    
        /**
         * 切入点,所有的mapper pcakge下面的类的方法
         */
        @Pointcut(value = "execution(* com.xx.xx.dao.mapper..*.*(..))")
        @SuppressWarnings("all")
        public void pointCutTransaction() {
        }
    
        /**
         * 根据方法名称,判断是读还是写
         * @param jp
         */
        @Before("pointCutTransaction()")
        public void doBefore(JoinPoint jp) {
            String methodName = jp.getSignature().getName();
            if (isReadReq(methodName)) {
                DynamicRoutingDataSource.DynamicDataSourceHolder.setRead();
            } else {
                DynamicRoutingDataSource.DynamicDataSourceHolder.setWrite();
            }
        }
    
        /**
         * 方法结束 finally 时执行
         * @param jp
         */
        @After("pointCutTransaction()")
        public void after(JoinPoint jp) {
            DynamicRoutingDataSource.DynamicDataSourceHolder.remove();
        }
    
        /**
         * 根据方法名,判断是否为读请求
         *
         * @param methodName
         * @return
         */
        private boolean isReadReq(String methodName) {
            for (String start : DefaultSlaveMethodStart) {
                if (methodName.startsWith(start)) {
                    return true;
                }
            }
            return false;
        }
    }
    
    

    上面的代码,根据方法名称前缀,反向判断哪些方法应该使用从库,凡是不匹配的方法,都走主库
    方法结束后,必须清空contextHolder,否则他可能发生混乱,如这里是manager层 call dao层,dao层退出执行后,不清空contextHolder,则manager层开启事务时,会直接使用dao的值,如果这个请求是query,分配给从库了,那么manager层开启事务时就用的是从库了,结果可想而知

    如此就完成了读写分离

    spring 事务

    当前使用的是编程声明式事务

    	@Override
    	public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    		if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
    			return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
    		}
    		else {
    			TransactionStatus status = this.transactionManager.getTransaction(this);
    			T result;
    			try {
    				result = action.doInTransaction(status);
    			}
    			catch (RuntimeException ex) {
    				// Transactional code threw application exception -> rollback
    				rollbackOnException(status, ex);
    				throw ex;
    			}
    			catch (Error err) {
    				// Transactional code threw error -> rollback
    				rollbackOnException(status, err);
    				throw err;
    			}
    			catch (Throwable ex) {
    				// Transactional code threw unexpected exception -> rollback
    				rollbackOnException(status, ex);
    				throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
    			}
    			this.transactionManager.commit(status);
    			return result;
    		}
    	}
    

    来发起事务,execute方法中

        this.transactionManager.getTransaction(this)
    

    将获取事务需要的链接,其内部会读取ThreadLocal变量,判断是否有事务连接,如果已有,或允许嵌套事务,则会重复利用当前事务链接
    如果当前请求已经被包含到了事务中,则根据策略,判断是否新开一个事务或不使用事务,它们的方法基本相同:通过创建一个新的连接来处理,并将当前已被打开的事务先挂起,等待当前操作执行结束后,再恢复外部事务,继续执行
    TransactionSynchronizationManager类记录了这些ThreadLocal变量,允许将事务bind到其resources中,DatasourceTransactionManager则会触发这些操作

    
    	@Override
    	protected Object doSuspend(Object transaction) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		txObject.setConnectionHolder(null);
    		return TransactionSynchronizationManager.unbindResource(this.dataSource);  //挂起事务的基本原理:将外部事务放到新建事务(可能是非事务)的suspendedResources上来进行挂起
    	}
    
    	@Override
    	protected void doResume(Object transaction, Object suspendedResources) {
    		TransactionSynchronizationManager.bindResource(this.dataSource, suspendedResources);  //将suspendedResources重新绑定到threadLocal变量
    	}
    
    
  • 相关阅读:
    121. Best Time to Buy and Sell Stock
    70. Climbing Stairs
    647. Palindromic Substrings
    609. Find Duplicate File in System
    583. Delete Operation for Two Strings
    556 Next Greater Element III
    553. Optimal Division
    539. Minimum Time Difference
    537. Complex Number Multiplication
    227. Basic Calculator II
  • 原文地址:https://www.cnblogs.com/windliu/p/8920467.html
Copyright © 2011-2022 走看看