zoukankan      html  css  js  c++  java
  • 并发包学习之-atomic包

    一,模拟并发代码:

    线程不安全的代码

    //并发模拟代码
    public class CountExample {
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        //全局变量
        public static int count = 0;
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        add();
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("count:"+count);
    
            //关闭线程池
            executorService.shutdown();
    
        }
    
        private static void add(){
            count++;
        }
    }

    二,.原子性-Atomic包
    1.AtomicInteger类中提供了incrementAndGet方法;
    public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    2.incrementAndGet方法又调用了Unsafe类的getAndAddInt方法
    public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
    var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
    }
    3.getAndAddInt方法又是如何保证原子性的呢?该方法调用了compareAndSwapInt方法(就是我们说的CAS)
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    compareAndSwapInt方法是native方法,这个方法是java底层的方法(不是通过java实现的)
    4.原理解析:

    public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
    var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
    }
    Object var1:传进来的AtomicInteger对象
    long var2:是传进来的值,当前要进行加一的值 (比如要进行2+1的操作, var2就是2)
    int var4:是传进来的值,进行自增要加上的值 (比如要进行2+1的操作, var4就是1)
    int var5:是通过调用底层的方法this.getIntVolatile(var1, var2);得到的底层当前的值
    while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)):
    通过do{} while()不停的将当前对象的传进来的值和底层的值进行比较,
    如果相同就将底层的值更新为:var5+var4(加一的操作),
    如果不相同,就重新再从底层取一次值,然后再进行比较,这就是CAS的核心。
    帮助理解:
    AtomicInteger里面存的值看成是工作内存中的值
    把底层的值看成是主内存中的值。在多线程中,工作内存中的值和主内存中的值会出现不一样的情况。

    线程安全的代码:
    //线程安全的并发
    public class CountExample2 {
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        //全局变量
        public static AtomicInteger count = new AtomicInteger(0);
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        add();
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("count:"+count.get());
            //关闭线程池
            executorService.shutdown();
        }
        private static void add(){
            //count++;
            count.incrementAndGet();
        }
    }
    
    
    二,AtomicBoolean的使用:
    1.需求:
    当第一个线程进来的时候,我希望做一些初始化的操作,当第一个线程做完初始化的操作,我需要标记初始化的工作已经做完了,其他后进来的线程不需要做这个初始化的工作了,并且必须保证只被做了一次
    有问题的代码:
    public class CountExample4 {
        //请求总数
        public static int clientTotal = 50000;
        //同时并发执行的线程数
        public static int threadTotal = 2000;
        public static boolean isHappened = false;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        test();
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("isHappened:"+isHappened);
            //关闭线程池
            executorService.shutdown();
        }
    
        private static void test(){
            //如果是false,就更新为true,并且我希望只有一个线程执行了判断条件里的代码
            if (isHappened == false){
                isHappened = true;
                System.out.println("execute");
            }
        }
    }
    
    

    执行结果:

    execute
    execute
    isHappened:true

    注意: 

    当有大量的线程同时进来时,我们不能保证execute只被执行了一次。这时,我们可以考虑使用AtomicBoolean类

    public class CountExample3 {
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
        public static AtomicBoolean isHappened = new AtomicBoolean(false);
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            //信号灯,同时允许执行的线程数
            final Semaphore semaphore = new Semaphore(threadTotal);
            //计数器,
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        //获取信号灯,当并发达到一定数量后,该方法会阻塞而不能向下执行
                        semaphore.acquire();
                        test();
                        //释放信号灯
                        semaphore.release();
                    }catch (InterruptedException e){
                        System.out.println("exception");
                        e.printStackTrace();
                    }
                    //闭锁,每执行一次add()操作,请求数就减一
                    countDownLatch.countDown();
                });
            }
    
            //等待上面的线程都执行完毕,countDown的值减为0,然后才向下执行主线程
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印count的值
            System.out.println("isHappened:"+isHappened.get());
            //关闭线程池
            executorService.shutdown();
        }
    
        private static void test(){
            //如果是false,就更新为true
           if (isHappened.compareAndSet(false,true)){
               System.out.println("execute");
           }
        }

     三,CAS中的ABA问题
    描述:在CAS操作时,其他线程将变量的值从A改成了B,然后又将B改回了A。
    解决思路:每次变量改变时,将变量的版本号加1,只要变量被修改过,变量的版本号就会发生递增变化
    使用的类:AtomicStampedReference,
    调用compareAndSet方法:
    public boolean compareAndSet(V expectedReference,
    V newReference,
    int expectedStamp,
    int newStamp) {
    Pair<V> current = pair;
    return
    expectedReference == current.reference &&
    expectedStamp == current.stamp &&
    ((newReference == current.reference &&
    newStamp == current.stamp) ||
    casPair(current, Pair.of(newReference, newStamp)));
    }
    stamp是每次更新时就维护的, 通过对比来判断是不是一个版本号,expectedStamp == current.stamp

     
     
  • 相关阅读:
    WPF之Binding基础八 使用Linq数据作为Binding的源
    WPF之Binding基础七 使用XML数据作为Binding的源
    WPF之Binding基础六 使用ADO.NET作为Binding的数据源
    WPF之Binding基础五 使用集合对象作为列表控件的ItemSource
    WPF之Binding基础四 使用DataContext作为Binding的源
    解决 VS的IISExpress localhost可以访问,127.0.0.1和本机ip访问不了(错误400)
    c# 使用特性封装提供额外行为Validate验证
    c# 反射调用方法、获取设置值、好处和局限性
    c# 反射加读取类、方法、特性、破坏单例
    linq to object使用
  • 原文地址:https://www.cnblogs.com/inspred/p/9623247.html
Copyright © 2011-2022 走看看