zoukankan      html  css  js  c++  java
  • Java基础-读写分离实现

    一、是什么
      所谓读写分离,就是应用程序对数据库的操作请求分发到读库和写库,查询操作分发到读库(从库),增删改操作分发到写库(主库)。
    二、有什么用
      读写分离主要是为了解决业务数据量大后,读写操作在一个库上操作引起数据库性能问题,通过将请求按类型分发,缓解数据库压力,提高应用程序的执行效率。
    三、怎么用
      目前读写分离实现方案主要有两种:中间件解决和业务层解决。
      中间件解决指的是使用mysql-proxy、Amoeba等数据库中间件实现读写分离。优点是1、程序无需任何修改,开发人员无感知;2、动态添加数据源无需重启程序。缺点是1、程序依赖中间件,切换数据库可能会很困难;2、 使用中间件,相当于加了一层,对性能肯定有所损耗。
      业务层解决一般就是使用AOP根据方法签名判断操作应该分发到从库还是主库,在AOP中实现操作分发。优点是1、多数据源切换方便,由程序自动完成;2、理论上支持任何数据库。缺点是1、本该应该是数据库层完成的操作(原则上数据库操作就应该由数据来完成)上浮到业务层,增加了编程复杂度及数据库操作控制的混乱程度;2、无法做到动态添加数据源。
      一般我们优先选择在业务层实现读写分离。
      1、定义DynamicDataSource
     
    import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
     
    /**
    * 定义动态数据源,实现通过集成Spring提供的AbstractRoutingDataSource,只需要实现determineCurrentLookupKey方法即可
    *
    * 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,由DynamicDataSourceHolder完成。
    *
    * @author zhijun
    *
    */
    public class DynamicDataSource extends AbstractRoutingDataSource{
     
    @Override
    protected Object determineCurrentLookupKey() {
    // 使用DynamicDataSourceHolder保证线程安全,并且得到当前线程中的数据源key
    return DynamicDataSourceHolder.getDataSourceKey();
    }
     
    }
      2、定义DynamicDataSourceHolder
    *
    * 使用ThreadLocal技术来记录当前线程中的数据源的key
    *
    * @author zhijun
    *
    */
    public class DynamicDataSourceHolder {
     
    //写库对应的数据源key
    private static final String MASTER = "master";
     
    //读库对应的数据源key
    private static final String SLAVE = "slave";
     
    //使用ThreadLocal记录当前线程的数据源key
    private static final ThreadLocal<String> holder = new ThreadLocal<String>();
     
    /**
    * 设置数据源key
    * @param key
    */
    public static void putDataSourceKey(String key) {
    holder.set(key);
    }
     
    /**
    * 获取数据源key
    * @return
    */
    public static String getDataSourceKey() {
    return holder.get();
    }
     
    /**
    * 标记写库
    */
    public static void markMaster(){
    putDataSourceKey(MASTER);
    }
     
    /**
    * 标记读库
    */
    public static void markSlave(){
    putDataSourceKey(SLAVE);
    }
     
    } 
      
      3、定义DataSourceAspect切面
    import org.apache.commons.lang3.StringUtils;
    import org.aspectj.lang.JoinPoint;
     
    /**
    * 定义数据源的AOP切面,通过该Service的方法名判断是应该走读库还是写库
    *
    * @author zhijun
    *
    */
    public class DataSourceAspect {
     
    /**
    * 在进入Service方法之前执行
    *
    * @param point 切面对象
    */
    public void before(JoinPoint point) {
    // 获取到当前执行的方法名
    String methodName = point.getSignature().getName();
    if (isSlave(methodName)) {
    // 标记为读库
    DynamicDataSourceHolder.markSlave();
    } else {
    // 标记为写库
    DynamicDataSourceHolder.markMaster();
    }
    }
     
    /**
    * 判断是否为读库
    *
    * @param methodName
    * @return
    */
    private Boolean isSlave(String methodName) {
    // 方法名以query、find、get开头的方法名走从库
    return StringUtils.startsWithAny(methodName, "query", "find", "get");
    }
     
    }
     
      4、配置数据源(jdbc.properties)
    jdbc.master.driver=com.mysql.jdbc.Driver
    jdbc.master.url=jdbc:mysql://127.0.0.1:3306/mybatis_1128?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true
    jdbc.master.username=root
    jdbc.master.password=123456
     
     
    jdbc.slave01.driver=com.mysql.jdbc.Driver
    jdbc.slave01.url=jdbc:mysql://127.0.0.1:3307/mybatis_1128?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true
    jdbc.slave01.username=root
    jdbc.slave01.password=123456
      5、定义连接池
    <!-- 配置连接池 -->
    <bean id="masterDataSource" class="com.jolbox.bonecp.BoneCPDataSource"
    destroy-method="close">
    <!-- 数据库驱动 -->
    <property name="driverClass" value="${jdbc.master.driver}" />
    <!-- 相应驱动的jdbcUrl -->
    <property name="jdbcUrl" value="${jdbc.master.url}" />
    <!-- 数据库的用户名 -->
    <property name="username" value="${jdbc.master.username}" />
    <!-- 数据库的密码 -->
    <property name="password" value="${jdbc.master.password}" />
    <!-- 检查数据库连接池中空闲连接的间隔时间,单位是分,默认值:240,如果要取消则设置为0 -->
    <property name="idleConnectionTestPeriod" value="60" />
    <!-- 连接池中未使用的链接最大存活时间,单位是分,默认值:60,如果要永远存活设置为0 -->
    <property name="idleMaxAge" value="30" />
    <!-- 每个分区最大的连接数 -->
    <property name="maxConnectionsPerPartition" value="150" />
    <!-- 每个分区最小的连接数 -->
    <property name="minConnectionsPerPartition" value="5" />
    </bean>
     
    <!-- 配置连接池 -->
    <bean id="slave01DataSource" class="com.jolbox.bonecp.BoneCPDataSource"
    destroy-method="close">
    <!-- 数据库驱动 -->
    <property name="driverClass" value="${jdbc.slave01.driver}" />
    <!-- 相应驱动的jdbcUrl -->
    <property name="jdbcUrl" value="${jdbc.slave01.url}" />
    <!-- 数据库的用户名 -->
    <property name="username" value="${jdbc.slave01.username}" />
    <!-- 数据库的密码 -->
    <property name="password" value="${jdbc.slave01.password}" />
    <!-- 检查数据库连接池中空闲连接的间隔时间,单位是分,默认值:240,如果要取消则设置为0 -->
    <property name="idleConnectionTestPeriod" value="60" />
    <!-- 连接池中未使用的链接最大存活时间,单位是分,默认值:60,如果要永远存活设置为0 -->
    <property name="idleMaxAge" value="30" />
    <!-- 每个分区最大的连接数 -->
    <property name="maxConnectionsPerPartition" value="150" />
    <!-- 每个分区最小的连接数 -->
    <property name="minConnectionsPerPartition" value="5" />
    </bean>
      6、定义DataSource
    <!-- 定义数据源,使用自己实现的数据源 -->
    <bean id="dataSource" class="cn.itcast.usermanage.spring.DynamicDataSource">
    <!-- 设置多个数据源 -->
    <property name="targetDataSources">
    <map key-type="java.lang.String">
    <!-- 这个key需要和程序中的key一致 -->
    <entry key="master" value-ref="masterDataSource"/>
    <entry key="slave" value-ref="slave01DataSource"/>
    </map>
    </property>
    <!-- 设置默认的数据源,这里默认走写库 -->
    <property name="defaultTargetDataSource" ref="masterDataSource"/>
    </bean>
     
     
      7、配置事务管理以及动态切换数据源切面
      定义事务管理器
    <!-- 定义事务管理器 -->
    <bean id="transactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
    </bean> 
      
      定义事务策略
    <!-- 定义事务策略 -->
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
    <!--定义查询方法都是只读的 -->
    <tx:method name="query*" read-only="true" />
    <tx:method name="find*" read-only="true" />
    <tx:method name="get*" read-only="true" />
     
    <!-- 主库执行操作,事务传播行为定义为默认行为 -->
    <tx:method name="save*" propagation="REQUIRED" />
    <tx:method name="update*" propagation="REQUIRED" />
    <tx:method name="delete*" propagation="REQUIRED" />
     
    <!--其他方法使用默认事务策略 -->
    <tx:method name="*" />
    </tx:attributes>
    </tx:advice>
      定义切面
    <!-- 定义AOP切面处理器 -->
    <bean class="cn.itcast.usermanage.spring.DataSourceAspect" id="dataSourceAspect" />
     
    <aop:config>
    <!-- 定义切面,所有的service的所有方法 -->
    <aop:pointcut id="txPointcut" expression="execution(* xx.xxx.xxxxxxx.service.*.*(..))" />
    <!-- 应用事务策略到Service切面 -->
    <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/>
     
    <!-- 将切面应用到自定义的切面处理器上,-9999保证该切面优先级最高执行 -->
    <aop:aspect ref="dataSourceAspect" order="-9999">
    <aop:before method="before" pointcut-ref="txPointcut" />
    </aop:aspect>
    </aop:config>
      还可以使用事务策略规则匹配。使用事务管理策略中的规则匹配,比通过方法名匹配更好。
    <!-- 定义AOP切面处理器 -->
    <bean class="cn.itcast.usermanage.spring.DataSourceAspect" id="dataSourceAspect">
    <!-- 指定事务策略 -->
    <property name="txAdvice" ref="txAdvice"/>
    <!-- 指定slave方法的前缀(非必须) -->
    <property name="slaveMethodStart" value="query,find,get"/>
    </bean>
       8、业务实现
     
    import java.lang.reflect.Field;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
     
    import org.apache.commons.lang3.StringUtils;
    import org.aspectj.lang.JoinPoint;
    import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
    import org.springframework.transaction.interceptor.TransactionAttribute;
    import org.springframework.transaction.interceptor.TransactionAttributeSource;
    import org.springframework.transaction.interceptor.TransactionInterceptor;
    import org.springframework.util.PatternMatchUtils;
    import org.springframework.util.ReflectionUtils;
     
    /**
    * 定义数据源的AOP切面,该类控制了使用Master还是Slave。
    *
    * 如果事务管理中配置了事务策略,则采用配置的事务策略中的标记了ReadOnly的方法是用Slave,其它使用Master。
    *
    * 如果没有配置事务管理的策略,则采用方法名匹配的原则,以query、find、get开头方法用Slave,其它用Master。
    *
    * @author zhijun
    *
    */
    public class DataSourceAspect {
     
    private List<String> slaveMethodPattern = new ArrayList<String>();
     
    private static final String[] defaultSlaveMethodStart = new String[]{ "query", "find", "get" };
     
    private String[] slaveMethodStart;
     
    /**
    * 读取事务管理中的策略
    *
    * @param txAdvice
    * @throws Exception
    */
    @SuppressWarnings("unchecked")
    public void setTxAdvice(TransactionInterceptor txAdvice) throws Exception {
    if (txAdvice == null) {
    // 没有配置事务管理策略
    return;
    }
    //从txAdvice获取到策略配置信息
    TransactionAttributeSource transactionAttributeSource = txAdvice.getTransactionAttributeSource();
    if (!(transactionAttributeSource instanceof NameMatchTransactionAttributeSource)) {
    return;
    }
    //使用反射技术获取到NameMatchTransactionAttributeSource对象中的nameMap属性值
    NameMatchTransactionAttributeSource matchTransactionAttributeSource = (NameMatchTransactionAttributeSource) transactionAttributeSource;
    Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
    nameMapField.setAccessible(true); //设置该字段可访问
    //获取nameMap的值
    Map<String, TransactionAttribute> map = (Map<String, TransactionAttribute>) nameMapField.get(matchTransactionAttributeSource);
     
    //遍历nameMap
    for (Map.Entry<String, TransactionAttribute> entry : map.entrySet()) {
    if (!entry.getValue().isReadOnly()) {//判断之后定义了ReadOnly的策略才加入到slaveMethodPattern
    continue;
    }
    slaveMethodPattern.add(entry.getKey());
    }
    }
     
    /**
    * 在进入Service方法之前执行
    *
    * @param point 切面对象
    */
    public void before(JoinPoint point) {
    // 获取到当前执行的方法名
    String methodName = point.getSignature().getName();
     
    boolean isSlave = false;
     
    if (slaveMethodPattern.isEmpty()) {
    // 当前Spring容器中没有配置事务策略,采用方法名匹配方式
    isSlave = isSlave(methodName);
    } else {
    // 使用策略规则匹配
    for (String mappedName : slaveMethodPattern) {
    if (isMatch(methodName, mappedName)) {
    isSlave = true;
    break;
    }
    }
    }
     
    if (isSlave) {
    // 标记为读库
    DynamicDataSourceHolder.markSlave();
    } else {
    // 标记为写库
    DynamicDataSourceHolder.markMaster();
    }
    }
     
    /**
    * 判断是否为读库
    *
    * @param methodName
    * @return
    */
    private Boolean isSlave(String methodName) {
    // 方法名以query、find、get开头的方法名走从库
    return StringUtils.startsWithAny(methodName, getSlaveMethodStart());
    }
     
    /**
    * 通配符匹配
    *
    * Return if the given method name matches the mapped name.
    * <p>
    * The default implementation checks for "xxx*", "*xxx" and "*xxx*" matches, as well as direct
    * equality. Can be overridden in subclasses.
    *
    * @param methodName the method name of the class
    * @param mappedName the name in the descriptor
    * @return if the names match
    * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
    */
    protected boolean isMatch(String methodName, String mappedName) {
    return PatternMatchUtils.simpleMatch(mappedName, methodName);
    }
     
    /**
    * 用户指定slave的方法名前缀
    * @param slaveMethodStart
    */
    public void setSlaveMethodStart(String[] slaveMethodStart) {
    this.slaveMethodStart = slaveMethodStart;
    }
     
    public String[] getSlaveMethodStart() {
    if(this.slaveMethodStart == null){
    // 没有指定,使用默认
    return defaultSlaveMethodStart;
    }
    return slaveMethodStart;
    }
     
    }
     
     
    四、深入研究方向
      1、主库插入记录,从库完成对主库的同步前查询记录为空问题
      2、如何实现一主多从
    import java.lang.reflect.Field;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicInteger;
     
    import javax.sql.DataSource;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
    import org.springframework.util.ReflectionUtils;
     
    /**
    * 定义动态数据源,实现通过集成Spring提供的AbstractRoutingDataSource,只需要实现determineCurrentLookupKey方法即可
    *
    * 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,由DynamicDataSourceHolder完成。
    *
    * @author zhijun
    *
    */
    public class DynamicDataSource extends AbstractRoutingDataSource {
     
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicDataSource.class);
     
    private Integer slaveCount;
     
    // 轮询计数,初始为-1,AtomicInteger是线程安全的
    private AtomicInteger counter = new AtomicInteger(-1);
     
    // 记录读库的key
    private List<Object> slaveDataSources = new ArrayList<Object>(0);
     
    @Override
    protected Object determineCurrentLookupKey() {
    // 使用DynamicDataSourceHolder保证线程安全,并且得到当前线程中的数据源key
    if (DynamicDataSourceHolder.isMaster()) {
    Object key = DynamicDataSourceHolder.getDataSourceKey();
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("当前DataSource的key为: " + key);
    }
    return key;
    }
    Object key = getSlaveKey();
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("当前DataSource的key为: " + key);
    }
    return key;
     
    }
     
    @SuppressWarnings("unchecked")
    @Override
    public void afterPropertiesSet() {
    super.afterPropertiesSet();
     
    // 由于父类的resolvedDataSources属性是私有的子类获取不到,需要使用反射获取
    Field field = ReflectionUtils.findField(AbstractRoutingDataSource.class, "resolvedDataSources");
    field.setAccessible(true); // 设置可访问
     
    try {
    Map<Object, DataSource> resolvedDataSources = (Map<Object, DataSource>) field.get(this);
    // 读库的数据量等于数据源总数减去写库的数量
    this.slaveCount = resolvedDataSources.size() - 1;
    for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) {
    if (DynamicDataSourceHolder.MASTER.equals(entry.getKey())) {
    continue;
    }
    slaveDataSources.add(entry.getKey());
    }
    } catch (Exception e) {
    LOGGER.error("afterPropertiesSet error! ", e);
    }
    }
     
    /**
    * 轮询算法实现
    *
    * @return
    */
    public Object getSlaveKey() {
    // 得到的下标为:0、1、2、3……
    Integer index = counter.incrementAndGet() % slaveCount;
    if (counter.get() > 9999) { // 以免超出Integer范围
    counter.set(-1); // 还原
    }
    return slaveDataSources.get(index);
    }
     
    }
    五、面试点
      1、读写分离的实现方式及各自优劣势

  • 相关阅读:
    Python操作Excel
    unittest单元测试生成HTML测试报告
    pycharm安装 package报错:module 'pip' has no attribute 'main'
    Jenkins关闭、重启,Jenkins服务的启动、停止方法。
    selenium如何获取已定位元素的属性值?
    本周学习小结(23/03
    本周学习小结(16/03
    本周学习小结(09/03
    本周学习小结(02/03
    本周学习小结(24/02
  • 原文地址:https://www.cnblogs.com/qhj348770376/p/9245135.html
Copyright © 2011-2022 走看看