zoukankan      html  css  js  c++  java
  • 并发包学习(一)-Atomic包小记

    此篇是J.U.C学习的第一篇Atomic包相关的内容,希望此篇总结能对自己的基础有所提升。本文总结来源自《Java并发编程的艺术》第七章并配以自己的实践理解。如有错误还请指正。

    一、案例分析

    首先看两段代码:

    代码①:

    /**
     * @author laoyeye
     * @Description: 5000个线程,200个并发
     * @date 2018/8/16 21:58
     */
    public class IntTest {
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(count);
        }
    
        private static void add() {
            count++;
        }

    5000个线程200个并发的情况下,对一个共享变量进行++操作。

    结果:4997

    代码②:

    public class AtomicIntegerTest {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static AtomicInteger count = new AtomicInteger(0);
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(count);
        }
    
        private static void add() {
            count.incrementAndGet();
        }
    }

    5000个线程200个并发的情况下,同样进行每次加一操作。

    结果:5000。和预期的结果一样

    那么为什么AtomicInteger可以得到预期的结果,而使用基本数据类型Int的值却不对呢?

    主要是原子性的问题,Int的操作,在多线程的情况下并不保证原子性,而AtomicInteger则是一个JDK提供的一个原子操作类,具体AtomicInteger怎么实现的原子性可以看下文。

    二、Atomic相关概念

    java从JDK1.5开始提供java.util.concurrent.atomic包,即本文所述的Atomic包。这个包的原子操作类提供了一个简单,高效,线程安全地更新一个变量的方式。

    因为变量的类型很多,Atomic包基本上分为四种类型的更新方式,分别是原子更新基本类型,原子更新数组,原子更新引用和原子更新属性(字段)。Atomic包的类基本上都是使用Unsafe实现的包装类。 Unsafe 类提供了硬件级别的原子操作,可以安全的直接操作内存变量,其在 JUC 源码中被广泛的使用。

    三、原子更新基本类型

    1、AtomicBoolen:原子更新布尔类型。

    2、AtomicInteger:原子更新整型。

    3、AtomicLong:原子更新整型。

    AtomicInteger详解

    同样以一种的代码②为例,为什么AtomicInteger的incrementAndGet()方法保证了原子性的操作呢,我们来看一下源码的实现:

    源码①:

        static {
            try {
                valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }

    首先我们通过unsafe调用了它的objectFieldOffset(Field field)方法,这个方法返回指定的变量在所属类的内存偏移地址,偏移地址仅仅在该Unsafe函数中访问指定字段时使用。

    源码②:

    unsafe.getAndAddInt(this, valueOffset, 1) + 1;
       public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }

    getIntVolatile获取对象obj中偏移量offset的变量对应的volative内存语义的值,即预期的值var5。

    compareAndSwapInt方法中,var1为需要改变的对象,var2为偏移量(即之前求出来的valueOffset的值),var5为expect的值,第四个为update后的值。

    当value的值与expect这个值相等,那么则将value修改为update这个值,并返回true,否则返回false。

    此操作极为常说的CAS原子操作,这里使用while循环是考虑到多个线程同时调用的情况CAS失败后需要自旋重试。

    AtomicBoolen详解

    代码③

    public class AtomicBooleanTest {
    
            // 请求总数
            public static int clientTotal = 5000;
    
            // 同时并发执行的线程数
            public static int threadTotal = 200;
    
            public static AtomicBoolean isHappened = new AtomicBoolean(false);
    
            public static void main(String[] args) throws Exception {
                ExecutorService executorService = Executors.newCachedThreadPool();
                final Semaphore semaphore = new Semaphore(threadTotal);
                final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
                for (int i = 0; i < clientTotal; i++) {
                    executorService.execute(() -> {
                        try {
                            semaphore.acquire();
                            test();
                            semaphore.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                executorService.shutdown();
                System.out.println(isHappened.get());
            }
    
            private static void test() {
                if (isHappened.compareAndSet(false, true)) {
                    System.out.println("execute");
                }
            }
        }

    执行结果:

    execute
    true

    通过结果可知System.out.println("execute");的代码只执行过一次,200的并发,为什么只执行了一次呢,我们再来看下源码的解决办法。

    源码③

        public final boolean compareAndSet(boolean expect, boolean update) {
            int e = expect ? 1 : 0;
            int u = update ? 1 : 0;
            return unsafe.compareAndSwapInt(this, valueOffset, e, u);
        }

    我们看到当调用compareAndSet方法时,先把Boolean型转换为整型,在使用compareAndSwapInt进行CAS。所以即使在200并发的情况下,AtomicBoolen依旧能够保持原子性。

    通过上面两个类的讲解我们看到都是使用的compareAndSwapInt的方法,unsafe类还提供了compareAndSwapLong,用于AtomicLong,以及compareAndSwapObject方法。而像char,float,double等数据类型没有对应的原子操作类,这时候我们可以参考AtomicBoolen的思路做类似处理。

     四、原子更新数组

    1、AtomicIntegerArray:原子更新整型数组里的元素

    2、AtomicLongArray:原子更新长整型数组里的元素

    3、AtomicReferenceArray:原子更新引用类型数组里的元素

    这里我们只介绍下AtomicIntegerArray,基本操作类似。

    代码④

    public class AtomicIntegerArrayTest {
    
        // 请求总数
        public static int clientTotal = 5000;
    
        // 同时并发执行的线程数
        public static int threadTotal = 200;
    
        static int[] value = new int[]{1,2};
    
        public static AtomicIntegerArray ai = new AtomicIntegerArray(value);
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int i = 0; i < clientTotal ; i++) {
                executorService.execute(() -> {
                    try {
                        semaphore.acquire();
                        test();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            System.out.println(ai.get(0));
            System.out.println(value[0]);
        }
    
        private static void test() {
            ai.getAndSet(0,3);
        }
    }

    结果:3,1

    为什么是3和1呢,同样的我们从源码中找答案。

    源码④:

        public final int getAndSet(int i, int newValue) {
            return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
        }
        public final int getAndSetInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    
            return var5;
        }

    同样的原理,当前位置的数组value的值和预期的值相等,然后将对应的元素更新为新的值。但是需要注意的是,AtomicIntegerArray会将当前数组复制一份,所以当AtomicIntegerArray对内部的数组元素进行修改后,不会影响到原先的数组。

    五、原子更新引用类型

    1、AtomicReference:原子更新引用类型

    2、AtomicStampedReference:更新带有版本号的引用类型,可解决CAS的ABA问题

    3、AtomicMarkableReference:原子更新带有标记位的引用类型

     原子更新基本类型每次只能更新一个变量,如果要原子更新更多变量,这时候就需要引用类型了。

    代码⑤

    public class AtomicReferenceTest {
        public static void main(String[] args) {
    
            User user1 = new User("张三",12);
            User user2 = new User("lisi",20);
    
            AtomicReference<User> ar = new AtomicReference<User>();
            ar.set(user1);
            ar.compareAndSet(user1, user2);
    
            System.out.println("user " + ar.get().getName());
        }
    
    
    static class User {
        private String name;
        private int old;
    
        public String getName() {
            return name;
        }
    
        public int getOld() {
            return old;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void setOld(int old) {
            this.old = old;
        }
    
        public User(String name, int old) {
            this.name = name;
            this.old = old;
        }
    }
    }

    结果:user lisi

    可以看到结果已经原子更新为lisi了,年龄也同步更新。

    代码⑥

    public class AtomicMarkableReferenceTest {
        public static void main(String[] args) {
    
    
            User user1 = new User("张三",12);
            User user2 = new User("lisi",20);
    
            AtomicStampedReference ar = new AtomicStampedReference(user1,0);
    
            final  Integer stamp = ar.getStamp();
    
            ar.compareAndSet(user1, user2,stamp,stamp+10);
    
            System.out.println("user " + ((User)ar.getReference()).getName());
            System.out.println("user " + ar.getStamp());
    
            System.out.println( ar.compareAndSet(user1, user2, stamp,stamp+10));
        }
    
    
    static class User {
        private String name;
        private int old;
    
        public String getName() {
            return name;
        }
    
        public int getOld() {
            return old;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void setOld(int old) {
            this.old = old;
        }
    
        public User(String name, int old) {
            this.name = name;
            this.old = old;
        }
    }
    }

    结果:

    user lisi
    user 10
    false

    可以看到我们在做了原子更新后,版本号也做了改变,这时候如果还用原来的版本号去更新,就会出现更新失败的情况。

    AtomicMarkableReference跟AtomicStampedReference类似 
    AtomicStampedReference是使用pair的int stamp作为计数器使用,AtomicMarkableReference的pair使用的是boolean mark。 
    就像一杯水,AtomicStampedReference可能关心的是动过几次,AtomicMarkableReference关心的是有没有被人动过,方法都比较简单,不在演示了。

    六、原子更新字段类

    1、AtomicIntegerFieldUpdater:更新整型字段

    2、AtomicLongFieldUpdater:更新长整型字段

    3、AtomicReferenceFieldUpdater:原子更新引用类型里的字段

    public class AtomicIntegerFieldUpdaterTest {
    
        private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> updater =
                AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class, "count");
    
        public volatile int count = 100;
    
        public static void main(String[] args) {
    
            AtomicIntegerFieldUpdaterTest ai = new AtomicIntegerFieldUpdaterTest();
    
            if (updater.compareAndSet(ai, 100, 120)) {
                System.out.println("方法1,"+ai.getCount());
            }
    
            if (updater.compareAndSet(ai, 100, 120)) {
                System.out.println("方法2,"+ai.getCount());
            } else {
                System.out.println("方法3,"+ai.getCount());
            }
        }
    
        public int getCount() {
            return count;
        }
    
        public void setCount(int count) {
            this.count = count;
        }
    }

    结果:

    方法1,120
    方法3,120

     原子更新字段类需要两部,①必须使用静态方法newupdate()创建一个更新器,并且设置想要更新的类和属性。第二步,更新类的字段属性必须使用public volatile修饰

     七、1.8新增的LongAdder相关类

    这个类是1.8新增的一个类,为什么在已经有AtomicLong的情况下,还是增加了这个类呢?

    这主要是由于AtomicLong CAS算法的缺陷造成的,众所周知,CAS是比较当前值与预期的值是否相等,相等则更新为新的值,否则重新自旋取值。这就造成了CAS在高并发情况性下大量失败,性能较低的情况。

    既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,那么性能问题不就迎刃而解了吗?

    没错,因此,JDK8 提供的LongAdder就是这个思路。这个类我目前只在网上了解到原理,还未应用也不了解源码实现,等以后再更新吧。

    下文来自简书:https://www.jianshu.com/p/22d38d5c8c2a

    总结分析下LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效的原因

    • 首先和AtomicLong一样,都会先采用cas方式更新值
    • 在初次cas方式失败的情况下(通常证明多个线程同时想更新这个值),尝试将这个值分隔成多个cell(sum的时候求和就好),让这些竞争的线程只管更新自己所属的cell(因为在rehash之前,每个线程中存储的hashcode不会变,所以每次都应该会找到同一个cell),这样就将竞争压力分散了

    AtomicLong可否可以被LongAdder替代

    有了传说中更高效的LongAdder,那AtomicLong可否不使用了呢?当然不是!

    答案就在LongAdder的java doc中,从我们翻译的那段可以看出,LongAdder适合的场景是统计求和计数的场景,而且LongAdder基本只提供了add方法,而AtomicLong还具有cas方法(要使用cas,在不直接使用unsafe之外只能借助AtomicXXX了),,例如getAndIncrement、getAndDecrement等,使用起来非常的灵活,而LongAdder只有add和sum,使用起来比较受限。

  • 相关阅读:
    【Linux系列汇总】小白博主的嵌入式Linux实战快速进阶之路(持续更新)
    【matlab系列汇总】小白博主的matlab学习实战快速进阶之路(持续更新)
    【FreeRTOS学习01】CubeIDE快速整合FreeRTOS创建第一个任务
    STM32F767ZI NUCLEO144 基于CubeIDE快速开发入门指南
    【matlab 基础篇 01】快速开始第一个程序(详细图文+文末资源)
    Linux 通过终端命令行切换系统语言
    ubuntu 1604升级到ubuntu 1804无法忽视的细节问题(亲测有效)
    假如用王者荣耀的方式学习webpack
    小程序第三方框架对比 ( wepy / mpvue / taro )
    我所理解的前端
  • 原文地址:https://www.cnblogs.com/laoyeye/p/9496936.html
Copyright © 2011-2022 走看看