zoukankan      html  css  js  c++  java
  • 基于Spring读写分离

    为什么是基于Spring的呢,因为实现方案基于Spring的事务以及AbstractRoutingDataSource(spring中的一个基础类,可以在其中放多个数据源,然后根据一些规则来确定当前需要使用哪个数据,既可以进行读写分离,也可以用来做分库分表)

    我们只需要实现

    
     determineCurrentLookupKey()
    
    

    每次生成jdbc connection时,都会先调用该方法来甄选出实际需要使用的datasource,由于这个方法并没有参数,因此,最佳的方式是基于ThreadLocal变量

    
     @Component
    @Primary
    public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    
        @Resource(name = "masterDs")
        private DataSource masterDs;
    
        @Resource(name = "slaveDs")
        private DataSource slaveDs;
    
        @Override
        public void afterPropertiesSet() {
            Map<Object, Object> targetDataSources = new HashMap<>();
            targetDataSources.put("slaveDs", slaveDs);
            targetDataSources.put("masterDs", masterDs);
            this.setTargetDataSources(targetDataSources);
            this.setDefaultTargetDataSource(masterDs);
            super.afterPropertiesSet();
        }
    
        @Override
        protected Object determineCurrentLookupKey() {
            return DynamicDataSourceHolder.contextHolder.get();
        }
    
    
        /**
         * 持有当前线程所有数据源的程序
         */
        public static class DynamicDataSourceHolder {
            public static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
            public static void setWrite() {
                contextHolder.set("masterDs");
            }
    
            public static void setRead() {
                contextHolder.set("slaveDs");
            }
    
            public static void remove() {
                contextHolder.remove();
            }
    
        }
    }
    
    

    DynamicRoutingDataSource 仅仅是一个DataSource实现,更高层的类主要用它来创建连接,如getConnection(),getConnection会先寻找实际的datasource,在创建连接

    
    	@Override
    	public Connection getConnection() throws SQLException {
    		return determineTargetDataSource().getConnection();
    	}
    
    

    以上代码,使用了ThreadLocal contextHolder来存取当前需要的datasource的loopkey
    contextHolder上的值可能没有被初始化,此时contextHolder.get() 等于 null,此时会使用默认的datasource,我们设置的默认值对应的是主库

      
    	/**
    	 * Retrieve the current target DataSource. Determines the
    	 * {@link #determineCurrentLookupKey() current lookup key}, performs
    	 * a lookup in the {@link #setTargetDataSources targetDataSources} map,
    	 * falls back to the specified
    	 * {@link #setDefaultTargetDataSource default target DataSource} if necessary.
    	 * @see #determineCurrentLookupKey()
    	 */
    	protected DataSource determineTargetDataSource() {
    		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
    		Object lookupKey = determineCurrentLookupKey();
    		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
    		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
    			dataSource = this.resolvedDefaultDataSource;
    		}
    		if (dataSource == null) {
    			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
    		}
    		return dataSource;
    	}
    
    

    下一步,是在sql请求前,先对contextHolder赋值,手动赋值工作量很大,并且容易出错,也没有办法规范

    实现方案是基于Aop,根据方法名,或者方法注解来区分

    • 根据方法名的好处比较明显,只要工程的managerdao层方法名称规范就行,不用到处添加注解,缺点是不精准
    • 根据注解来区分,缺点是需要在很多个类或接口上加注解,优点是精准

    如果有特殊的业务,可以两种情况都使用,如以下场景:

    • 一个大的service,先后调用多个manager层dao层sql,先insert、后query,并且想直接查主库,也就是写后立即查,由于mysql主从同步可能会有延迟,写后立即查可能会读到老数据,写后立即查的情况比较复杂,如果不是事务的话,实现其实比较复杂,如何在非事务场景下,让两个顺序执行的请求,保持同一个connection,需单独调研,根据实际情况进行修改

    我们将设置contextHolder的地方加在了dao层,出于以下考量:

    • 透过manager层,直接调用dao时,无风险
    • 当前工程采用的是手动在manager层开启事务,开启事务lookupKey一定为null,采用默认的masterDs,没有问题,事务开启链接后,dao层的每个被调用的方法,会使用事务中的链接(由Spring Transaction控制)
    • 如果在manager层开启事务,manager层的方法名可能不规范,dao层是最接近sql请求的地方,规范更容易遵循

    当然,这不是最佳实践,如果在manager层做这个事,也是可以的,看具体的情况,要求是,名称或注解表意为query的方法,里面不能做任何更新操作,因为manager层已经确定了它会查从库,以下方法是会执行失败的

    
    public class Manager1{
    
      ......
      public List getSome(params){
           dao1.getRecord1(params);
           dao2.updateLog(params);
      }
    
    }
    
    

    因为dao2是一个更新请求,使用从库进行更新,肯定是会失败的
    (使用Spring Boot,或Spring时,需确保开启AspectJ,Spring Boot开启方式 @EnableAspectJAutoProxy(proxyTargetClass = true)

    aop示例

     @Aspect
    @Order(-10)
    @Component
    public class DataSourceAspect {
    
        public static final Logger logger = LoggerFactory.getLogger(DataSourceAspect.class);
    
        private static final String[] DefaultSlaveMethodStart
                = new String[]{"query", "find", "get", "select", "count", "list"};
    
        /**
         * 切入点,所有的mapper pcakge下面的类的方法
         */
        @Pointcut(value = "execution(* com.xx.xx.dao.mapper..*.*(..))")
        @SuppressWarnings("all")
        public void pointCutTransaction() {
        }
    
        /**
         * 根据方法名称,判断是读还是写
         * @param jp
         */
        @Before("pointCutTransaction()")
        public void doBefore(JoinPoint jp) {
            String methodName = jp.getSignature().getName();
            if (isReadReq(methodName)) {
                DynamicRoutingDataSource.DynamicDataSourceHolder.setRead();
            } else {
                DynamicRoutingDataSource.DynamicDataSourceHolder.setWrite();
            }
        }
    
        /**
         * 方法结束 finally 时执行
         * @param jp
         */
        @After("pointCutTransaction()")
        public void after(JoinPoint jp) {
            DynamicRoutingDataSource.DynamicDataSourceHolder.remove();
        }
    
        /**
         * 根据方法名,判断是否为读请求
         *
         * @param methodName
         * @return
         */
        private boolean isReadReq(String methodName) {
            for (String start : DefaultSlaveMethodStart) {
                if (methodName.startsWith(start)) {
                    return true;
                }
            }
            return false;
        }
    }
    
    

    上面的代码,根据方法名称前缀,反向判断哪些方法应该使用从库,凡是不匹配的方法,都走主库
    方法结束后,必须清空contextHolder,否则他可能发生混乱,如这里是manager层 call dao层,dao层退出执行后,不清空contextHolder,则manager层开启事务时,会直接使用dao的值,如果这个请求是query,分配给从库了,那么manager层开启事务时就用的是从库了,结果可想而知

    如此就完成了读写分离

    spring 事务

    当前使用的是编程声明式事务

    	@Override
    	public <T> T execute(TransactionCallback<T> action) throws TransactionException {
    		if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
    			return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
    		}
    		else {
    			TransactionStatus status = this.transactionManager.getTransaction(this);
    			T result;
    			try {
    				result = action.doInTransaction(status);
    			}
    			catch (RuntimeException ex) {
    				// Transactional code threw application exception -> rollback
    				rollbackOnException(status, ex);
    				throw ex;
    			}
    			catch (Error err) {
    				// Transactional code threw error -> rollback
    				rollbackOnException(status, err);
    				throw err;
    			}
    			catch (Throwable ex) {
    				// Transactional code threw unexpected exception -> rollback
    				rollbackOnException(status, ex);
    				throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
    			}
    			this.transactionManager.commit(status);
    			return result;
    		}
    	}
    

    来发起事务,execute方法中

        this.transactionManager.getTransaction(this)
    

    将获取事务需要的链接,其内部会读取ThreadLocal变量,判断是否有事务连接,如果已有,或允许嵌套事务,则会重复利用当前事务链接
    如果当前请求已经被包含到了事务中,则根据策略,判断是否新开一个事务或不使用事务,它们的方法基本相同:通过创建一个新的连接来处理,并将当前已被打开的事务先挂起,等待当前操作执行结束后,再恢复外部事务,继续执行
    TransactionSynchronizationManager类记录了这些ThreadLocal变量,允许将事务bind到其resources中,DatasourceTransactionManager则会触发这些操作

    
    	@Override
    	protected Object doSuspend(Object transaction) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		txObject.setConnectionHolder(null);
    		return TransactionSynchronizationManager.unbindResource(this.dataSource);  //挂起事务的基本原理:将外部事务放到新建事务(可能是非事务)的suspendedResources上来进行挂起
    	}
    
    	@Override
    	protected void doResume(Object transaction, Object suspendedResources) {
    		TransactionSynchronizationManager.bindResource(this.dataSource, suspendedResources);  //将suspendedResources重新绑定到threadLocal变量
    	}
    
    
  • 相关阅读:
    sprintf使用
    Android ListView保持选中项高亮
    Creational Patterns创建型模式
    C和指针终于看到指针这一章
    C++随笔001
    TCP reset
    开始看设计模式英文版了
    Excel条件求和
    linux中安装软件,查看、卸载已安装软件方法
    linux vi文本编辑器三种模式切换及常用操作
  • 原文地址:https://www.cnblogs.com/windliu/p/8920467.html
Copyright © 2011-2022 走看看