zoukankan      html  css  js  c++  java
  • 4-1 线程安全性-原子性-atomic-1

    我们发现在不做任何同步的情况下,我们计算的累加结果是错误的。

    com.mmall.concurrency.example.count.CountExample2

    C:UsersHONGZHENHUAimoocconcurrencysrcmainjavacommmallconcurrencyexamplecountCountExample2.java

    package com.mmall.concurrency.example.count;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    @ThreadSafe
    public class CountExample2 {
    
         // 请求总数
         public static int clientTotal = 5000;//1000个请求
    
         // 同时并发执行的线程数
         public static int threadTotal = 200;//允许并发的线程数是50
    
         public static AtomicInteger count = new AtomicInteger(0);
    
         public static void main(String[] args) throws Exception {
             ExecutorService executorService = Executors.newCachedThreadPool();
             final Semaphore semaphore = new Semaphore(threadTotal);
             final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
             for (int i = 0; i < clientTotal; i++) {
                 executorService.execute(()-> {
                     try {
                         semaphore.acquire();
                         add();
                         semaphore.release();
                     } catch (Exception e) {
                         log.error("exception",e);
                     }
                     countDownLatch.countDown();
                 });
             }
                  countDownLatch.await();
                  executorService.shutdown();
                 //log.info("count:{}",count);
                  log.info("count:{}",count.get());
    
         }
    
         private static void add() {
             //count++;
             count.incrementAndGet();
             // count.getAndIncrement();
         }
    }

    把计数的值从int换成了AtomicInteger,看一下AtomicInteger的incrementAndGet方法的源码实现。AtomicInteger的源码实现使用了一个unsafe的类,unsafe提供了一个方法叫做getAndAddInt,getAndAddInt这个方法不是特别关键的,核心关键是getAndAddInt它的实现。

    compareAndSwapInt这个方法需要大家特别注意。compareAndSwapInt它其实是用native标识的方法。native这个代表是Java底层的方法,不是我们通过Java去实现的。Object var1是传过来的count这个对象,第二个值long var2是我们当前的值,比如我想执行的是2+1=3这个操作,当前这个第二个参数var2它等于2,第三个值var4等于1,如果当前的值var2跟底层的这个值var5相同的话,那么把它var5更新成后面的这个值va5+var4。当我们一个方法进来的时候,long var2它这个值是2,这个时候我们第一次取出来的变量var5的值应该也等于2,但是当我们在执行进行更新层增删的时候呢,可能会被别的线程更改,因此它这里要判断如果我当前这个值var2跟我期望的值var5是相同的话,就是这里面取出来的值var2也等于2的时候,那么我才允许它更新成3,否则重新取出来当前这个变量var5,然后这里面的变量var2相当于是重新从我当前的这个count对象里面取一次也会变成3,相当于继续判断,如果当前的值var2=3,与我底层的值var5相同也等于3的话,那么我就把它变成3+1=4的这个值。通过这样不停地循环来判断,最后保证的是,期望的是一个值,与底层的值完全相同的时候,才执行对应的+1的这个值,把底层的值给覆盖掉。

    compareAndSwapInt这个方法的核心就是CAS的核心。刚才我们介绍的是整型值的处理,对于其他的类型比如long、double、对象的值它们都可以通过对应的方法来进行处理,核心都是通过compareAndSwap的方式来进行操作。

    关于CAS的实践我们就先讲到这里,大家一定要记住这里面它的实现的原理,是拿当前这个对象里的值var2和底层的值var5来进行对比,如果当前的值跟底层的值它们一样的时候,才执行对应的操作,这就是它的核心的原理。如果不一样的话那就不停地取最新的值,直到它们相同的时候才进行这个操作。如果大家不明白为什么这个count对象里面的值会和正常存储数据的底层的值不一样的话,大家可以看一下工作内存和主内存的关系,count里面存的值其实就属于工作内存,底层其实就是主内存,它们的值不一定完全一样的,所以我们做一些同步的操作才能保证它们在某个时刻是一样的。

    com.mmall.concurrency.example.atomic.AtomicExample1

    C:UsersHONGZHENHUAimoocconcurrencysrcmainjavacommmallconcurrencyexampleatomicAtomicExample1.java

    package com.mmall.concurrency.example.atomic;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample1 {
    
         // 请求总数
         public static int clientTotal = 5000;//1000个请求
    
         // 同时并发执行的线程数
         public static int threadTotal = 200;//允许并发的线程数是50
    
         public static AtomicInteger count = new AtomicInteger(0);
    
         public static void main(String[] args) throws Exception {
             ExecutorService executorService = Executors.newCachedThreadPool();
             final Semaphore semaphore = new Semaphore(threadTotal);
             final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
             for (int i = 0; i < clientTotal; i++) {
                 executorService.execute(()-> {
                     try {
                         semaphore.acquire();
                         add();
                         semaphore.release();
                     } catch (Exception e) {
                         log.error("exception",e);
                     }
                     countDownLatch.countDown();
                 });
             }
                  countDownLatch.await();
                  executorService.shutdown();
                  //log.info("count:{}",count);
    log.info("count:{}",count.get()); }
    private static void add() {
    //count++; count.incrementAndGet();
    // count.getAndIncrement(); } }

    com.mmall.concurrency.example.atomic.AtomicExample2

    C:UsersHONGZHENHUAimoocconcurrencysrcmainjavacommmallconcurrencyexampleatomicAtomicExample2.java

    package com.mmall.concurrency.example.atomic;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample2 {
    
         // 请求总数
         public static int clientTotal = 5000;//1000个请求
    
         // 同时并发执行的线程数
         public static int threadTotal = 200;//允许并发的线程数是50
    
         public static AtomicLong count = new AtomicLong(0);
    
         public static void main(String[] args) throws Exception {
             ExecutorService executorService = Executors.newCachedThreadPool();
             final Semaphore semaphore = new Semaphore(threadTotal);
             final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
             for (int i = 0; i < clientTotal; i++) {
                 executorService.execute(()-> {
                     try {
                         semaphore.acquire();
                         add();
                         semaphore.release();
                     } catch (Exception e) {
                         log.error("exception",e);
                     }
                     countDownLatch.countDown();
                 });
             }
                  countDownLatch.await();
                  executorService.shutdown();
                  //log.info("count:{}",count);
                  log.info("count:{}",count.get());
    
         }
    
         private static void add() {
             //count++;
             count.incrementAndGet();
             // count.getAndIncrement();
         }
    }

    JDK8里面单独新增了一个类LongAdder跟AtomicLong特别像,

      因此我们这个线程安全了是没问题了。为什么有了AtomicLong之后还要新增一个LongAdder这个类?很显然如果增加一个类肯定是有它的优点的。这里LongAdder它的实现我们就不多说了,我们具体说一下LongAdder和AtomicLong它俩对应的优点和缺点。之前我们在讲AtomicInteger的实现机制的时候我们看过CAS的底层实现,它们是在一个死循环内不断地尝试修改目标值直到修改成功。如果竞争不激烈的时候呢,它修改成功的概率很高,否则的话修改失败的概率就会很高,在大量修改失败的时候呢,这些原子操作就会进行多次的循环尝试,因此呢性能会受到一些影响。这里呢有一个知识点,对于普通类型的long和double变量,JVM允许将64位的读操作或者写操作拆成两个32位的操作,那么LongAdder这个类它的实现是基于什么思想呢?它的核心是将热点索引分离,比如说它可以将AtomicLong的内部核心数据value分离成一个数组,每个线程访问时通过哈希系统算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加,其中热点数据value它会被分离成多个单元的cell,每个cell独自维护内部的值,当前对象的实际值由所有的cell累计合成,这样的话热点就进行了有效的分离并提高了并行度,这样一来呢LongAdder它相当于是在AtomicLong的基础上将单点的更新压力分散到各个节点上,在第一并发的时候呢通过对base的直接更新,可以很好的保障和Atomic的性能基本一致,而在高并发的时候呢则通过分散提高了性能,这就是我们所说的LongAdder,但是LongAdder也有自己的缺陷,它的缺点是在统计的时候如果有并发更新,可能会导致统计的数据有些误差。实际使用中,在处理高并发计数的时候,我们可以优先使用LongAdder,而不是继续使用AtomicLong,当然了在线程竞争很低的情况下进行计数,使用Atomic还是更简单、更直接一些,并且效率也会稍微高一点点。其他的情况下,比如序列号生成啦,这种情况下需要准确的数值,全局唯一的AtomicLong才是正确的选择,这个时候就不适合使用LongAdder了。

    com.mmall.concurrency.example.atomic.AtomicExample3

    C:UsersHONGZHENHUAimoocconcurrencysrcmainjavacommmallconcurrencyexampleatomicAtomicExample3.java

    package com.mmall.concurrency.example.atomic;
    
    import com.mmall.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.LongAdder;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample3 {
    
         // 请求总数
         public static int clientTotal = 5000;//1000个请求
    
         // 同时并发执行的线程数
         public static int threadTotal = 200;//允许并发的线程数是50
    
         public static LongAdder count = new LongAdder();
    
         public static void main(String[] args) throws Exception {
             ExecutorService executorService = Executors.newCachedThreadPool();
             final Semaphore semaphore = new Semaphore(threadTotal);
             final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
             for (int i = 0; i < clientTotal; i++) {
                 executorService.execute(()-> {
                     try {
                         semaphore.acquire();
                         add();
                         semaphore.release();
                     } catch (Exception e) {
                         log.error("exception",e);
                     }
                     countDownLatch.countDown();
                 });
             }
                  countDownLatch.await();
                  executorService.shutdown();
                  log.info("count:{}",count);
                  //log.info("count:{}",count.get());
    
         }
    
         private static void add() {
             //count++;
             count.increment();
             //count.incrementAndGet();
             // count.getAndIncrement();
         }
    }

    AtomicLong我们就暂时先介绍到这里,现在呢我们来看一下Atomic包。

    这是JDK里面默认提供好的包,这个包里面我们可以看到我们平时用的一些基本类型,什么布尔类型啦整型啦Long型都有对应的原子相关的类,这里除了compareAndSwapInt这个函数还有一个函数compareAndSet需要大家注意一下,这个方法/函数更多的用在于AtomicBoolean这个类里面。为什么这么说呢?实际中我们经常会遇到比如我希望某件事情只执行一次,在执行这件事情之前呢它的标记可能为false,一旦执行之后我就要把它变成true。这个时候呢如果我们使用AtomicBoolean这个类里面的compareAndSet方法,分别传入false、true的时候呢,就可以保证对应我们要控制的那一段代码只执行一次,甚至可以理解为当前只有一个线程可以执行这段代码。如果我们在执行完之后再把这个变量标识为false之后,当然它可以继续执行。这样它达到的效果就是,同样的代码、同一时间只有一个线程可以执行。

  • 相关阅读:
    PAT (Advanced Level) 1060. Are They Equal (25)
    PAT (Advanced Level) 1059. Prime Factors (25)
    PAT (Advanced Level) 1058. A+B in Hogwarts (20)
    PAT (Advanced Level) 1057. Stack (30)
    PAT (Advanced Level) 1056. Mice and Rice (25)
    PAT (Advanced Level) 1055. The World's Richest (25)
    PAT (Advanced Level) 1054. The Dominant Color (20)
    PAT (Advanced Level) 1053. Path of Equal Weight (30)
    PAT (Advanced Level) 1052. Linked List Sorting (25)
    PAT (Advanced Level) 1051. Pop Sequence (25)
  • 原文地址:https://www.cnblogs.com/ZHONGZHENHUA/p/10044878.html
Copyright © 2011-2022 走看看