zoukankan      html  css  js  c++  java
  • Java多线程--CAS

    在Java多线程并发的情况下同时对一个变量进行操作会出现线程安全的问题,假如我们现在使用20个线程对一个变量不停累加1,代码如下:

     1 public class ThreadDemo implements Runnable {
     2     private int num = 0;
     3     @Override
     4     public void run() {
     5         try {
     6             Thread.sleep(1000);
     7         } catch (InterruptedException e) {
     8             e.printStackTrace();
     9         }
    10         num++;
    11         System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num);
    12     }
    13 
    14     public static void main(String[] args) {
    15         ThreadDemo demo = new ThreadDemo();
    16         for (int i = 0; i < 20; i++) {
    17             new Thread(demo,"线程" + i).start();
    18         }
    19     }
    20 }

    理想情况是累加到20,但实际运行的结果如下:

     1 线程2处理成功,num = 3
     2 线程3处理成功,num = 3
     3 线程1处理成功,num = 3
     4 线程0处理成功,num = 3
     5 线程5处理成功,num = 4
     6 线程7处理成功,num = 6
     7 线程9处理成功,num = 8
     8 线程4处理成功,num = 5
     9 线程6处理成功,num = 4
    10 线程8处理成功,num = 7
    11 线程10处理成功,num = 9
    12 线程11处理成功,num = 11
    13 线程13处理成功,num = 12
    14 线程12处理成功,num = 11
    15 线程14处理成功,num = 13
    16 线程15处理成功,num = 14
    17 线程19处理成功,num = 18
    18 线程18处理成功,num = 18
    19 线程16处理成功,num = 16
    20 线程17处理成功,num = 16

    实际运行的结果可能有多种情况,因为在Java多线程并发的情况下会有这种安全问题,导致结果不准确,针对这种问题,有以下几种解决方案

    1、方案一:synchronized

     1 public class ThreadDemo implements Runnable {
     2 
     3     private int num = 0;
     4 
     5     private synchronized void increase(){
     6         num++;
     7     }
     8 
     9     @Override
    10     public void run() {
    11         try {
    12             Thread.sleep(1000);
    13         } catch (InterruptedException e) {
    14             e.printStackTrace();
    15         }
    16         increase();
    17         System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num);
    18     }
    19 
    20     public static void main(String[] args) {
    21         ThreadDemo demo = new ThreadDemo();
    22         for (int i = 0; i < 20; i++) {
    23             new Thread(demo,"线程" + i).start();
    24         }
    25     }
    26 }

    运行结果如下:

     1 线程0处理成功,num = 1
     2 线程1处理成功,num = 2
     3 线程2处理成功,num = 3
     4 线程3处理成功,num = 4
     5 线程4处理成功,num = 5
     6 线程5处理成功,num = 6
     7 线程6处理成功,num = 7
     8 线程7处理成功,num = 8
     9 线程12处理成功,num = 13
    10 线程14处理成功,num = 15
    11 线程15处理成功,num = 16
    12 线程16处理成功,num = 17
    13 线程10处理成功,num = 11
    14 线程9处理成功,num = 10
    15 线程8处理成功,num = 10
    16 线程13处理成功,num = 14
    17 线程11处理成功,num = 13
    18 线程17处理成功,num = 18
    19 线程18处理成功,num = 19
    20 线程19处理成功,num = 20

    这个时候,代码就是线程安全的了,因为我们加了synchronized,也就是让每个线程要进入increase()方法之前先得尝试加锁,同一时间只有一个线程能加锁,其他线程需要等待锁。通过这样处理,就可以保证换个data每次都会累加1,不会出现数据错乱的问题。但是,如此简单的data++操作,都要加一个重磅的synchronized锁来解决多线程并发问题,一个接一个的排队,加锁,处理数据,释放锁,下一个再进来,有点大材小用,synchronized是可以解决更加复杂的并发编程场景和问题的。

    2、更高效方案:Atomic类

    对于这种简单的data++类的操作,java并发包下面提供了一系列的Atomic原子类,比如说AtomicInteger,可以保证多线程并发安全的情况下,高性能的并发更新一个数值,代码如下:

     1 public class ThreadDemo implements Runnable {
     2 
     3     private AtomicInteger num = new AtomicInteger(0);
     4 
     5     @Override
     6     public void run() {
     7         try {
     8             Thread.sleep(1000);
     9         } catch (InterruptedException e) {
    10             e.printStackTrace();
    11         }
    12         num.incrementAndGet();
    13         System.out.println(Thread.currentThread().getName() + "处理成功,num = " + num);
    14     }
    15 
    16     public static void main(String[] args) {
    17         ThreadDemo demo = new ThreadDemo();
    18         for (int i = 0; i < 20; i++) {
    19             new Thread(demo,"线程" + i).start();
    20         }
    21     }
    22 }

    运行结果如下:

    线程1处理成功,num = 2
    线程0处理成功,num = 2
    线程2处理成功,num = 3
    线程4处理成功,num = 5
    线程3处理成功,num = 4
    线程5处理成功,num = 6
    线程6处理成功,num = 7
    线程7处理成功,num = 8
    线程9处理成功,num = 11
    线程11处理成功,num = 12
    线程13处理成功,num = 14
    线程15处理成功,num = 16
    线程16处理成功,num = 17
    线程17处理成功,num = 18
    线程10处理成功,num = 11
    线程8处理成功,num = 11
    线程14处理成功,num = 15
    线程12处理成功,num = 13
    线程18处理成功,num = 19
    线程19处理成功,num = 20

    多个线程可以并发的执行AtomicInteger的incrementAndGet()方法,意思就是data的值累加1,接着返回累加后最新的值,这个代码里就没有看到加锁和释放锁

    1   public final int getAndAddInt(Object var1, long var2, int var4) {
    2         int var5;
    3         do {
    4             var5 = this.getIntVolatile(var1, var2);
    5         } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    6 
    7         return var5;
    8     }

    实际上,Atomic原子类底层用的不是传统意义的锁机制,而是无锁化的CAS机制,通过CAS机制保证多线程修改一个数值的安全性,有点乐观锁的意思,先取值,累加的时候会比较原来的值是否改变,如果改变就不会执行,会再次获取最新值(Compare And Swap)

     3、Java8对CAS的优化:LongAdder

    CAS机制可以轻量级的实现线程安全,但CAS也有一个问题就是在多线程同时更改一个变量的值的时候可能会循环多次才会变更成功,在高并发的情况下这种情况会更常见,耗费大量系统资源在死循环上面,因此Java8推出了LongAdder

      1   /**
      2      * Adds the given value.
      3      *
      4      * @param x the value to add
      5      */
      6     public void add(long x) {
      7         Cell[] as; long b, v; int m; Cell a;
      8         if ((as = cells) != null || !casBase(b = base, b + x)) {
      9             boolean uncontended = true;
     10             if (as == null || (m = as.length - 1) < 0 ||
     11                 (a = as[getProbe() & m]) == null ||
     12                 !(uncontended = a.cas(v = a.value, v + x)))
     13                 longAccumulate(x, null, uncontended);
     14         }
     15     }
     16 
     17   /**
     18      * Handles cases of updates involving initialization, resizing,
     19      * creating new Cells, and/or contention. See above for
     20      * explanation. This method suffers the usual non-modularity
     21      * problems of optimistic retry code, relying on rechecked sets of
     22      * reads.
     23      *
     24      * @param x the value
     25      * @param fn the update function, or null for add (this convention
     26      * avoids the need for an extra field or function in LongAdder).
     27      * @param wasUncontended false if CAS failed before call
     28      */
     29     final void longAccumulate(long x, LongBinaryOperator fn,
     30                               boolean wasUncontended) {
     31         int h;
     32         if ((h = getProbe()) == 0) {
     33             ThreadLocalRandom.current(); // force initialization
     34             h = getProbe();
     35             wasUncontended = true;
     36         }
     37         boolean collide = false;                // True if last slot nonempty
     38         for (;;) {
     39             Cell[] as; Cell a; int n; long v;
     40             if ((as = cells) != null && (n = as.length) > 0) {
     41                 if ((a = as[(n - 1) & h]) == null) {
     42                     if (cellsBusy == 0) {       // Try to attach new Cell
     43                         Cell r = new Cell(x);   // Optimistically create
     44                         if (cellsBusy == 0 && casCellsBusy()) {
     45                             boolean created = false;
     46                             try {               // Recheck under lock
     47                                 Cell[] rs; int m, j;
     48                                 if ((rs = cells) != null &&
     49                                     (m = rs.length) > 0 &&
     50                                     rs[j = (m - 1) & h] == null) {
     51                                     rs[j] = r;
     52                                     created = true;
     53                                 }
     54                             } finally {
     55                                 cellsBusy = 0;
     56                             }
     57                             if (created)
     58                                 break;
     59                             continue;           // Slot is now non-empty
     60                         }
     61                     }
     62                     collide = false;
     63                 }
     64                 else if (!wasUncontended)       // CAS already known to fail
     65                     wasUncontended = true;      // Continue after rehash
     66                 else if (a.cas(v = a.value, ((fn == null) ? v + x :
     67                                              fn.applyAsLong(v, x))))
     68                     break;
     69                 else if (n >= NCPU || cells != as)
     70                     collide = false;            // At max size or stale
     71                 else if (!collide)
     72                     collide = true;
     73                 else if (cellsBusy == 0 && casCellsBusy()) {
     74                     try {
     75                         if (cells == as) {      // Expand table unless stale
     76                             Cell[] rs = new Cell[n << 1];
     77                             for (int i = 0; i < n; ++i)
     78                                 rs[i] = as[i];
     79                             cells = rs;
     80                         }
     81                     } finally {
     82                         cellsBusy = 0;
     83                     }
     84                     collide = false;
     85                     continue;                   // Retry with expanded table
     86                 }
     87                 h = advanceProbe(h);
     88             }
     89             else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
     90                 boolean init = false;
     91                 try {                           // Initialize table
     92                     if (cells == as) {
     93                         Cell[] rs = new Cell[2];
     94                         rs[h & 1] = new Cell(x);
     95                         cells = rs;
     96                         init = true;
     97                     }
     98                 } finally {
     99                     cellsBusy = 0;
    100                 }
    101                 if (init)
    102                     break;
    103             }
    104             else if (casBase(v = base, ((fn == null) ? v + x :
    105                                         fn.applyAsLong(v, x))))
    106                 break;                          // Fall back on using base
    107         }
    108     }

    LongAdder就是尝试使用分段CAS的方式来提升高并发执行CAS操作的性能,当并发更新的线程数量过多,其内部会搞一个Cell数组,每个数组是一个数值分段,这时,让大量的线程分别去对不同Cell内部的value值进行CAS累加操作,把CAS计算压力分散到了不同的Cell分段数值中,这样就可以大幅度的降低多线程并发更新同一个数值时出现的无限循环的问题,而且他内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。

     1   /**
     2      * Returns the current sum.  The returned value is <em>NOT</em> an
     3      * atomic snapshot; invocation in the absence of concurrent
     4      * updates returns an accurate result, but concurrent updates that
     5      * occur while the sum is being calculated might not be
     6      * incorporated.
     7      *
     8      * @return the sum
     9      */
    10     public long sum() {
    11         Cell[] as = cells; Cell a;
    12         long sum = base;
    13         if (as != null) {
    14             for (int i = 0; i < as.length; ++i) {
    15                 if ((a = as[i]) != null)
    16                     sum += a.value;
    17             }
    18         }
    19         return sum;
    20     }

    最后,从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回

    从LongAdder分段加锁的实现逻辑中,我们也可以对于一些并发量较大,持续时间较长的不适用缓存模式的抢购类项目的乐观锁进行改造,假如商品有1000个库存,那么完全可以给拆成20个库存段,可以在数据库的表里建20个库存字段,比如stock_01,stock_02,以此类推,总之,就是把你的1000件库存给他拆开,每个库存段是50件库存,比如stock_01对应50件库存,stock_02对应50件库存。接着,每秒1000个请求过来了,通过简单的随机算法,每个请求都是随机在20个分段库存里,选择一个进行加锁。这样有最多20个下单请求一起执行,每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存,通过这种方式提升并发量,提高用户体验。

     

  • 相关阅读:
    mongodb教程
    redis高级知识
    memcached删除机制与大数据缓存问题
    nginx 运维基础
    mysql 集群与分区
    Memcached之缓存雪崩,缓存穿透,缓存预热,缓存算法
    git cz配置
    Angular零碎知识点-持续补充
    Vue学习笔记-组件
    Vue学习笔记-自定义指令生命周期函数
  • 原文地址:https://www.cnblogs.com/ding-dang/p/11032247.html
Copyright © 2011-2022 走看看