zoukankan      html  css  js  c++  java
  • 利用双重检查锁定和CAS算法:解决并发下数据库的一致性问题

    背景

    ​ 最近有一个场景遇到了数据库的并发问题。现在先由我来抽象一下,去掉不必要的繁杂业务。
    ​ 数据库表book存储着每本书的阅读量,一开始数据库是空的,不存在任何的数据。当用户访问接口的时候,判断表book是否存在此书的记录,如果不存在,即插入一条新记录,而且阅读量设置为1;当下个用户再阅读此书时,再调用接口就直接将此书的阅读量增加1,而不用再插入新记录。

    并发下出现的问题

    ​ 下面看一下伪代码:

    public void addOrUpdateBook{
        Book oldBook = this.bookMapper.selectByBookName(bookName);
        if (oldBook == null){ // 1、判断书本记录是否为空
            this.bookMapper.insertSelective(b);  // 2、新增书本记录
        }else{
            Integer updateCount = this.bookMapper.updateReadFrequency(book); // 3、更新阅读量
        }
    }
    

    1、插入多条相同的书本记录:

    ​ 假如此时线程A在【1】处判断出书本记录为空,然后在【2】处进行创建书本记录;如果线程B在线程A提交前又进来【1】处判断书本是否为空,因为线程A还没创建完和提交事务,数据库中的书本记录还是为空,所以线程B也开始进行书本记录的创建。最后的结果:数据库中存在两条此书的记录。

    2、书本的阅读量更新后不准确

    ​ 假如书本《Java并发编程的艺术》的阅读量为10,然后此时线程A进行阅读量加1的操作,但是在线程A提交事务前,线程B也进行阅读量加1的操作,那么就是不管是线程A还是线程B,都是在阅读量10的基础上加1,最后当两个线程都执行完时,《Java并发编程的艺术》的阅读量就是11,这和正确结果阅读量12是不一样的!

    思考

    ​ 此时我们都知道了,并发会导致数据库数据的一致性问题。那么,我们该怎么解决呢?

    1、方法加锁

    ​ 最简单的做法,就是直接在方法那里加synchronized关键字,将整个方法锁起来,每个请求只能一个一个按照顺序执行,那么插入和更新的并发问题都不存在了。

    public synchronized void addOrUpdateBook{
        Book oldBook = this.bookMapper.selectByBookName(bookName);
        if (oldBook == null){ // 1、判断书本记录是否为空
            this.bookMapper.insertSelective(b);  // 2、新增书本记录
        }else{
            Integer updateCount = this.bookMapper.updateReadFrequency(book); // 3、更新阅读量
        }
    }
    

    2、双重检查锁定

    ​ 上面的方法性能是最低的,未获取到锁的所有请求都会被阻塞在方法外面(不管是否存在并发问题)。其实,我们可以利用双重检查锁定来解决这个锁性能低问题,做法也是非常的简单,就是我们只管对可能出现插入并发问题的代码进行上锁就行了,就是说从同步方法改为同步块,不再锁住不必要的代码。

    public void addOrUpdateBook{
    	Book oldBook = this.bookMapper.selectByBookName(bookName);
    	if (oldBook == null){ // 1、判断书本记录是否为空
    	    // 加锁
    	    synchronized (this){
    	        oldBook = this.bookMapper.selectByBookName(bookName);
    	        if (oldBook == null){
    	            BeanUtil.copyProperties(query,b);
    	            this.bookMapper.insertSelective(b); // 2、新增书本记录
    	        }else{
    	            Integer updateCount = this.bookMapper.updateReadFrequency(book); // 3、更新阅读量 
    	        }
    	    }
    	}else{
    	    Integer updateCount = this.bookMapper.updateReadFrequency(book); // 4、更新阅读量
    	}
    }
    

    ​ 我们可以看到:假如线程A此时在【2】处进行记录新增,而此时线程B也在【1】处判断书本记录为空,然后被阻塞在同步块外,当线程A执行完释放锁后,线程B获取到锁,但是此时书本记录不再为空,线程B就直接更新阅读量而不再插入书本记录。当然了,后续的所有请求在第一重就能判断出书本记录不为空,然后直接更新阅读量。

    ​ 那么就是说,我们的插入并发问题解决啦,而且性能比同步方法高不少。但是呢,更新的并发问题还没解决,为什么我没有同时也锁住更新阅读量的代码呢?因为我觉得没啥必要,因为为了减少线程上下文的切换,我们都推荐无锁并发编程,那么我们能怎么做呢?下面将介绍如何参考CAS算法来防止更新的并发问题。

    3、CAS算法

    ​ 首先,我们的表需要增加一个字段:版本号。对,这时候大家可能想到了乐观锁。没错啦~CAS算法其实就是一种乐观锁。它的原理是:比较再交换;更新时,当我们保存的旧值和数据库的值一致时,我们就能将旧值更新为我们的新值。最后,我们利用死循环来不断循环保证最后能更新成功。大家可能会疑惑死循环会不会导致性能很低?其实还好啦,起码能避免了线程的上下文切换,而且,一般同时的请求量也不会这么离谱(我们公司),并发量很大可能要做其他的方案了。下面我们先上一下代码:

    public Boolean addOrUpdateBook(BookQuery query) {
         boolean flag = true;
         // 使用双重检查锁定来处理新增的并发问题
         Book b = Book.builder().bookName(query.getBookName()).build();
         Book oldBook = this.bookMapper.selectOne(b);
         if (oldBook == null){
             // 加锁
             synchronized (this){
                 oldBook = this.bookMapper.selectOne(b);
                 if (oldBook == null){
                     BeanUtil.copyProperties(query,b);
                     this.bookMapper.insertSelective(b);
                 }else{
                     updateBook(query);
                 }
             }
         }else{
             updateBook(query);
         }
         return flag;
     }
    
     /**
      * 参考CAS的无锁算法来处理更新的并发问题(利用死循环+版本号)
      * @param query
      */
     private void updateBook(BookQuery query){
         // 参考cas
         for (;;){
             // 获取当前记录的版本号
             Integer version = this.bookMapper.getVersionByBookName(query.getBookName());
             query.setVersion(version);
             // 根据书名和版本号进行阅读量更新
             Integer updateCount = this.bookMapper.updateBookByVersion(query);
             if (updateCount != null && updateCount.equals(1)){
                 // 如果更新成功就跳出循环
                 break;
             }
         }
     }
    

    ​ 更新阅读量的sql:

    /**
      * 根据书名和版本号更新book
      * @param book
      * @return
    */
    @Update("update book set version = version+1,read_frequency = #{readFrequency} where book_name = #{bookName} and version = #{version}")
    Integer updateBookByVersion(BookQuery book);
    

    ​ 这里的代码就不多做解释了,看一下注释就知道是什么原理,是非常简单易懂滴~

    4、利用Redis做分布式锁

    ​ 再回到加锁的那里,我们可以发现,当我们做微服务时,一般每个服务都会是多个实例,或者是单体应用的实例部署,我们的锁就只能针对单体应用有效了,而多个实例还是会导致插入的并发问题。这时候我们必须想到:分布式锁!

    ​ 我知道的现在主要做分布式锁有两种方式,一种是基于Redis的分布式锁,另外一种是基于Zookeeper的分布式锁。简单分析一下上面两种锁的优缺点:从可靠性上来说,Zookeeper分布式锁有好于Redis分布式锁;而从性能上来说,Redis分布式锁要好于Zookeeper分布式锁,毕竟Redis是纯内存操作的,性能是想当的好,号称每秒可以处理10万次读写操作呢,所以我最终选择了基于Redis的分布式锁。

    ​ 不过还有一个问题就是:大家玩过redis的都知道,redis只保证单个操作是具有事务的,是原子性的,多个操作就不能保证原子性了。因为我们一般加锁的做法都是,使锁具有超时的特征,避免一个请求无条件的等待锁,一直的阻塞导致系统CPU飙升。所以单单利用redis的方法我们做不到具有超时特征的分布式锁。当然了,现在有两种比较好的解决方案:一种是利用redis+lua脚本,一种是利用开源的框架Redisson。当然了,有简单的必须就利用简单的了,下面开始介绍如何利用Redisson开发分布式锁。

    ​ 1)加入redis和redisson依赖

    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
          <groupId>org.redisson</groupId>
          <artifactId>redisson</artifactId>
          <version>3.11.3</version>
    </dependency>
    

    ​ 2)创建Redisson的工具类

    package com.hyf.utils;
    
    import org.redisson.Redisson;
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    
    /**
     * @author Howinfun
     * @desc Redisson工具类
     * @date 2019/9/2
     */
    public class RedissonUtil {
    
        private static RedissonClient redissonClient;
    
        private RedissonUtil(){
            // 构造redisson实现分布式锁必要的Config
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456").setDatabase(1);
            // 构造RedissonClient
            redissonClient = Redisson.create(config);
        }
    
        public static final RedissonUtil INSTANCE = new RedissonUtil();
    
        /**
         * 设定锁定资源名称,返回锁
         * @param name
         * @return
         */
        public RLock getLock(String name){
            return redissonClient.getLock(name);
        }
    }
    
    

    ​ 3)修改业务代码

    public Boolean addOrUpdateBook(BookQuery query) {
        boolean flag = true;
        // 使用双重检查锁定来处理新增的并发问题
        Book b = Book.builder().bookName(query.getBookName()).build();
        Book oldBook = this.bookMapper.selectOne(b);
        if (oldBook == null){
            RLock lock = RedissonUtil.INSTANCE.getLock("addOrUpdateBook");
            try {
                // 尝试获取锁,最多等待10000毫秒,获取锁后1000毫秒自动释放锁
                if (lock.tryLock(10000, 10000, TimeUnit.MILLISECONDS)){
                    oldBook = this.bookMapper.selectOne(b);
                    if (oldBook == null){
                        BeanUtil.copyProperties(query,b);
                        this.bookMapper.insertSelective(b);
                    }else{
                        updateBook(query);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }finally {
                // 最后记得释放锁
                lock.unlock();
            }
        }else{
            updateBook(query);
        }
        return flag;
    }
    
    /**
      * 参考CAS的无锁算法来处理更新的并发问题(利用死循环+版本号)
      * @param query
    */
    private void updateBook(BookQuery query){
        // 参考cas
        for (;;){
            Integer version = this.bookMapper.getVersionByBookName(query.getBookName());
            query.setVersion(version);
            Integer updateCount = this.bookMapper.updateBookByVersion(query);
            if (updateCount != null && updateCount.equals(1)){
                break;
            }
        }
    }
    

    ​ 到这里,我们的代码已经比较好使的了,不但能预防数据库插入和更新的并发问题,还能在分布式环境下也好使!

    ​ 我猜大家是不怎么相信的了,那么下面我将用JMeter来测试一下。

    ​ 1)首先创建一个线程组:一共有10个线程,执行1次。

    ​ 2)创建Http请求:消息体中的数据是循环读取CSV数据文件里的数据的

    ​ 3)创建Http信息头管理:加上Content-Type

    ​ 4)创建并指定CSV数据文件

    ​ 5)点击启动。

    ​ 观察结果树:可以发现所有的请求都是请求成功的,没有报错。

    ​ 再观察一下控制台,可以发现只有一条插入sql和九条更新sql:

    2019-09-03 09:28:39.888 DEBUG 8020 --- [nio-8888-exec-7] com.hyf.mapper.BookMapper.selectOne      : ==> Parameters: Java并发编程的艺术(String)
    2019-09-03 09:28:39.889 DEBUG 8020 --- [nio-8888-exec-7] com.hyf.mapper.BookMapper.selectOne      : <==      Total: 0
    2019-09-03 09:28:39.937 DEBUG 8020 --- [nio-8888-exec-7] c.hyf.mapper.BookMapper.insertSelective  : ==>  Preparing: INSERT INTO book ( id,book_name ) VALUES( ?,? ) 
    2019-09-03 09:28:39.937 DEBUG 8020 --- [nio-8888-exec-7] c.hyf.mapper.BookMapper.insertSelective  : ==> Parameters: null, Java并发编程的艺术(String)
    2019-09-03 09:28:39.939 DEBUG 8020 --- [nio-8888-exec-7] c.hyf.mapper.BookMapper.insertSelective  : <==    Updates: 1
    2019-09-03 09:28:39.950 DEBUG 8020 --- [nio-8888-exec-6] c.h.m.BookMapper.updateBookByVersion     : ==>  Preparing: update book set version = version+1,read_frequency = read_frequency+1 where book_name = ? and version = ? 
    2019-09-03 09:28:39.951 DEBUG 8020 --- [nio-8888-exec-6] c.h.m.BookMapper.updateBookByVersion     : ==> Parameters: Java并发编程的艺术(String), 1(Integer)
    2019-09-03 09:28:39.953 DEBUG 8020 --- [nio-8888-exec-6] c.h.m.BookMapper.updateBookByVersion     : <==    Updates: 1
    2019-09-03 09:28:39.957 DEBUG 8020 --- [nio-8888-exec-5] c.h.m.BookMapper.updateBookByVersion     : ==>  Preparing: update book set version = version+1,read_frequency = read_frequency+1 where book_name = ? and version = ? 
    2019-09-03 09:28:39.957 DEBUG 8020 --- [nio-8888-exec-5] c.h.m.BookMapper.updateBookByVersion     : ==> Parameters: Java并发编程的艺术(String), 2(Integer)
    2019-09-03 09:28:39.959 DEBUG 8020 --- [nio-8888-exec-5] c.h.m.BookMapper.updateBookByVersion     : <==    Updates: 1
    。。。。。。。
    

    ​ 最后再看一下数据库:我们可以看到只有一条记录,而且阅读量为10,非常的准确!

    如果大家对此demo感兴趣的话,可以到github上和码云上拉取项目,项目里头还包含JMeter的测试用例噢:
    GitHub
    码云

    总结

    ​ 平时我们程序猿真的要多看书,虽然我自己也没看多少书,也没能好好坚持,但是从上个月开始我就下定决心好好看书了。也是因为最近在阅读《Java并发编程的艺术》,所以才有上面的思考和方案!

    ​ 如果大家对我的读书笔记和思维导图感兴趣,可以到这里看看:Java并发编程的艺术-阅读笔记和思维导图

  • 相关阅读:
    java字符串实现正序和倒序输出
    暑假前挑战赛1—— A,B题解
    深搜
    poj 1200 Crazy Search
    poj 1840 Eqs (hash)
    Choose the best route
    一个人的旅行
    畅通工程续
    最短路基础算法
    完全背包问题
  • 原文地址:https://www.cnblogs.com/Howinfun/p/11612653.html
Copyright © 2011-2022 走看看