zoukankan      html  css  js  c++  java
  • 07 Java的CAS机制(配合volatile,无锁的资源保护),常用API以及背后的unsafe对象

    一 无锁机制的应用场景与基本原理(CAS操作)

    1-1 CAS的应用场景

    账户取款问题:如何确保多线程环境下,账户的取款没有出现问题。

    采用三种方式:

    • 不对公共变量上锁(不安全)
    • 使用synchronized
    • 采用CAS的无锁机制
    package chapter6;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    interface Account{
        Integer getBalance();
        void withdraw(Integer amount);
        static void demo(Account account){
            List<Thread> ts = new ArrayList<>();
            long start = System.nanoTime();
            for (int i = 0; i < 1000; i++) {
                ts.add(new Thread(() -> {
                    account.withdraw(10);
                }));
            }
            ts.forEach(Thread::start);
            ts.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long end = System.nanoTime();
            System.out.println(account.getBalance()
                    + " cost: " + (end-start)/1000_000 + " ms");
        }
    }
    
    // 实现抽象接口
    class AccountUnsafe implements Account {
        private Integer balance;
        public AccountUnsafe(Integer balance) {
            this.balance = balance;
        }
        @Override
        public Integer getBalance() {
            return balance;
        }
        @Override
        public void withdraw(Integer amount) {
            balance -= amount;
        }
    }
    
    class AccountSafe1 implements Account {
        private Integer balance;
        public AccountSafe1(Integer balance) {
            this.balance = balance;
        }
        @Override
        public Integer getBalance() {
            synchronized (this){
                return this.balance;
            }
        }
        @Override
        public void withdraw(Integer amount) {
            synchronized (this) {
                this.balance -= amount;
            }
        }
    }
    
    class AccountCas implements Account {
        private AtomicInteger balance;
        AccountCas(int balance){
            this.balance = new AtomicInteger(balance);
        }
        @Override
        public Integer getBalance(){
            return balance.get();
        }
        @Override
        public void withdraw(Integer amount) {
            synchronized (this) {
                while(true){
                    int prev = balance.get();
                    int next = prev-amount;
                    if(balance.compareAndSet(prev,next)){
                        break;
                    }
                }
            }
        }
    }
    
    public class test1 {
        public static void main(String[] args) {
            Account.demo(new AccountUnsafe(10000));
            Account.demo(new AccountSafe1(10000));
            Account.demo(new AccountCas(10000));
        }
    
    }
    

    运行结果:

    • 可以看到采用synchronized以及AtomicInteger都可以保证最终的余额正确,都是0
    • Atomic Integer在获取共享变量时,通过CAS操作,将自己线程拥有的共享变量值与最新的进行比较,如何相同才会对共享变量进行操作,否则会进行回退。
    4800 cost: 469 ms
    0 cost: 194 ms
    0 cost: 163 ms
    

    1-2 CAS的底层原理

    CAS:从某一内存上取值V,和预期值A进行比较,如果内存值V和预期值A的结果相等,那么我们就把新值B更新到内存,如果不相等,那么就重复上述操作直到成功为止。

    • CAS是一条CPU的原子指令,不会造成所谓的数据不一致的问题,也就是说CAS是线程安全的。

    • 预期值A可以理解为线程上个时间状态获取到的值。内存值V的获取是原子操作中获得的

    • 获取主存值,比较主存值与预期值,执行失败或设置为修改值这三个步骤是由CPU的指令的原子性保证的

    • 预期值与主存值不一致说明当前要设置的值也并非线程想要设置的值

    • 其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交
      换】的原子性。(CAS通过指令的原子性操作保证操作的原子性
    • 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再
      开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子
      的。

    01 Java CAS原理与应用

    1-3 CAS操作与volatile配合使用

    获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
    注意

    • volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原
      子性
    • CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

    1-4 为什么CAS的无锁机制比synchronized"效率高"?

    • 无锁机制在多核CPU硬件的支持下避免了线程的上下文切换。注意如果是单核CPU,无锁机制无法发挥作用

    实际使用场景

    • CAS需要多核CPU的支持,需要使用CAS的线程数不能比CPU核心数多。

    1-5 CAS的总结(重点!!!)

    结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

    • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再
      重试呗。
    • synchronized 是基于悲观锁的思想(注意CAS不是锁):最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
    • CAS 体现的是无锁并发、无阻塞并发(无阻塞在于该操作不会导致线程的上下文切换),因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

    重点理解乐观与悲观的思想,无锁与无阻塞的概念

    二、原子整数类型与原子引用类型

    2-1 JUC工具包提供的原子整数类型

    AtomicBoolean
    AtomicInteger
    AtomicLong
    
    2-1-1 原子整数类型相关的原子操作
    AtomicInteger i = new AtomicInteger(0);
    i.incrementAndGet()                            
    i.decrementAndGet()
    i.getAndDecrement()
    i.getAndAdd(5)
    i.getAndUpdate(p -> p - 2)
    i.updateAndGet(p -> p + 2)
    i.getAndAccumulate(10, (p, x) -> p + x)
    i.accumulateAndGet(-10, (p, x) -> p + x)
    
    • 上面的四个表达式都是通过lambda表达式提供变量的变换方式,提高了程序的扩展性

    具体方式,是通过函数接口实现。

    // 利用内置函数简化代码
    class AccountCas implements Account {
        private AtomicInteger balance;
        AccountCas(int balance){
            this.balance = new AtomicInteger(balance);
        }
        @Override
        public Integer getBalance(){
            return balance.get();
        }
        @Override
        public void withdraw(Integer amount) {
            synchronized (this) {
    //            while(true){
    //                int prev = balance.get();
    //                int next = prev-amount;
    //                if(balance.compareAndSet(prev,next)){
    //                    break;
    //                }
    //            }
                balance.addAndGet(-1*amount);
            }
        }
    }
    
    
    2-2-2 updateAndGet的实现原理

    基本思想

    /*下面这个函数就是通过CAS函数来实现updateAndGet.
    while(true){
    	int prev = i.get();
    	int next = prev*10;
    	if(i.compareAndSet(prev,next)){
    		break;
    	}
    }
    
    
    2-2-3 采用函数接口进行扩展

    策略模式有着类似的思想:在有多种算法相似的情况下,使用 if...else 所带来的复杂和难以维护。

        public static void updataAndGet(AtomicInteger i, IntUnaryOperator operator){
            while(true){
                int prev = i.get();
                int next = operator.applyAsInt(prev);
                if(i.compareAndSet(prev,next)){
                    break;
                }
            }
        }
    
    2-2-4 最终的实现
    package chapter6;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.function.IntUnaryOperator;
    public class test2
    {
        public static void main(String[] args) {
            AtomicInteger i = new AtomicInteger(5);
            System.out.println(updataAndGet(i,p -> p/2));
        }
        public static int updataAndGet(AtomicInteger i, IntUnaryOperator operator){
            while(true){
                int prev = i.get();
                int next = operator.applyAsInt(prev);
                if(i.compareAndSet(prev,next)){
                    return next;                      // 如果更新成功,返回数值
                }
            }
        }
    }
    

    JDK源代码实现

    
    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }
    

    2-2 JUC工具包提供的原子引用类型

    为什么需要原子引用类型?

    • 因为有时候的共享数据未必是基本的数据类型,所以需要提供原子引用类型。

    实例:(对BigDecimal使用无所机制):

    package chapter6;
    import java.math.BigDecimal;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicReference;
    
    interface Account1{
        BigDecimal getBalance();
        void withdraw(BigDecimal amount);
        static void demo(Account1 account){
            List<Thread> ts = new ArrayList<>();
            long start = System.nanoTime();
            for (int i = 0; i < 1000; i++) {
                ts.add(new Thread(() -> {
                    account.withdraw(new BigDecimal("10"));
                }));
            }
            ts.forEach(Thread::start);
            ts.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long end = System.nanoTime();
            System.out.println(account.getBalance()
                    + " cost: " + (end-start)/1000_000 + " ms");
        }
    }
    
    class AccountCas1 implements Account1 {
        private AtomicReference<BigDecimal> balance;
        AccountCas1(BigDecimal balance){
            this.balance =  new AtomicReference<BigDecimal>(balance);
        }
        @Override
        public BigDecimal getBalance(){
            return balance.get();
        }
        @Override
        public void withdraw(BigDecimal amount) {
            while(true){
                BigDecimal prev = balance.get();
                BigDecimal next = prev.subtract(amount);
                if(balance.compareAndSet(prev,next)){
                    break;
                }
            }
        }
    }
    
    public class test3 {
        public static void main(String[] args) {
            Account1.demo(new AccountCas1(new BigDecimal("10000")));
        }
    }
    

    执行结果:

    0 cost: 250 ms
    

    2-3 ABA问题

    package chapter6;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.atomic.AtomicReference;
    @Slf4j(topic = "c.test4")
    public class test4 {
        static AtomicReference<String> ref = new AtomicReference<>("A");
        public static void main(String [] args) throws InterruptedException{
            log.warn("main start");
            // 线程获取值A
            String prev = ref.get();
            // other函数中2个线程,线程t1将A变为B,线程t2将B变为A。
            other();
            Thread.sleep(1000);
            // 主线程无法判断之前的值是否已经修改
            // 判断线程获得值与共享变量的最新值是否一致。
            log.warn("change A->C {}",ref.compareAndSet(prev,"C"));
        }
        private static void other(){
            new Thread(()->{
                log.warn("change A->B {}",ref.compareAndSet(ref.get(),"B"));
            },"t1").start();
    
            new Thread(()->{
                log.warn("change B->A {}",ref.compareAndSet(ref.get(),"A"));
            },"t2").start();
        }
    }
    
    

    ABA问题:线程无法感知到其他线程对共享变量的修改(其他线程将共享变量改成其他又改了回来)

    如何解决ABA问题,或者说如何让线程中能够感知到其他线程带来的ABA类型的变量修改?。

    2-3-1 使用AtomicStampedReference解决ABA问题(多个版本)

    解决方法:ABA问题之所以会出现,主要是因为compare的时候仅仅比较的类型的值,如果想要避免这种问题,可以通过增添变量的版本解决

    • Java提供了支持版本类型的引用类型
    • 通过版本号可以知道变量改变了多少
    package chapter6;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.atomic.AtomicStampedReference;
    @Slf4j(topic = "c.test4")
    public class test5 {
        static AtomicStampedReference<String> ref = new AtomicStampedReference<String>("A",0);
        public static void main(String [] args) throws InterruptedException{
            log.warn("main start");
            String prev = ref.getReference();
            int stamp = ref.getStamp();
            log.warn("{}",stamp);
            other();
            Thread.sleep(1000);
            log.warn("change A->C {}",ref.compareAndSet(prev,"C",stamp,stamp+1));
        }
    
        private static void other(){
            new Thread(()->{
                log.warn("change A->B {}",ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),
                        ref.getStamp()+1));
                log.warn("new version is {}",ref.getStamp());
            },"t1").start();
    
            new Thread(()->{
                log.warn("change B->A {}",ref.compareAndSet(ref.getReference(),"A",
                        ref.getStamp(),ref.getStamp()+1));
                log.warn("new version is {}",ref.getStamp());
            },"t2").start();
        }
    }
    
    
    • 上面的代码中每个更新数据不仅比较数值,还比较版本号
    • 更新数据的时候也可以更新数据版本
    2-3-2 使用AtomicMarkableReference解决ABA问题(只有bool值)
    package chapter6;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.atomic.AtomicMarkableReference;
    class GarbageBag {
        String desc;
        public GarbageBag(String desc) {
            this.desc = desc;
        }
        public void setDesc(String desc) {
            this.desc = desc;
        }
        @Override
        public String toString() {
            return super.toString() + " " + desc;
        }
    }
    
    @Slf4j(topic = "c.test6")
    public class test6 {
        public static void main(String[] args) {
            GarbageBag bag = new GarbageBag("垃圾袋状态:满了");
            AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); //
            log.warn("主线程 start...");
            GarbageBag prev = ref.getReference();
            log.warn(prev.toString());
    
    
            new Thread(() -> {
                while (!ref.compareAndSet(bag, bag, true, false)) {}
                bag.setDesc("垃圾袋状态:空");
            },"保洁阿姨").start();
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            /*
               保洁阿姨线程在主线程之前成功更换的垃圾袋,主线程通过mark标记,得知原子引用对象
               已经被其他线程修改。    
             */
    
            log.warn("主线程尝试更换垃圾袋:");
            boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
            log.warn("主线程是否更换垃圾袋:" + success);
            log.warn(ref.getReference().toString());
        }
    }
    
    • AtomicMarkableReference相比较AtomicStampedReference只有2种状态,没有多个版本号,适合于只需要进行一次的操作。

    执行结果:

    [main] WARN c.test6 - 主线程 start...
    [main] WARN c.test6 - chapter6.GarbageBag@372f7a8d 垃圾袋状态:满了
    [保洁阿姨] WARN c.test6 - chapter6.GarbageBag@372f7a8d 垃圾袋状态:空
    [main] WARN c.test6 - 主线程尝试更换垃圾袋:
    [main] WARN c.test6 - 主线程是否更换垃圾袋:false
    [main] WARN c.test6 - chapter6.GarbageBag@372f7a8d 垃圾袋状态:空
    

    2-4 原子数组

    应用场景:需要对数组中的数据提供线程安全的保护。

    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicReferenceArray

    知识点: java的函数式接口,提供了四种类型的函数式接口。

    基本特点

    • 所谓函数式接口,指的是只有一个抽象方法的接口。可以被隐式转换为Lambda表达式,用@FunctionalInterface注解标识。

    分类(主要根据传入的参数以及返回的结果进行区分):

    • 右边都是lambda表达式的格式,传入参数可以采用lambda
    类别1: supplier 提供者 无中生有            ()->结果
    类别2: function 函数 一个参数一个结果       (参数)->结果 , BiFunction (参数1,参数2)->结果
    类别3: consumer 消费者 一个参数没结果       (参数)->void, BiConsumer (参数1,参数2)->
    

    原子数组的使用示例:

    package chapter6;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicIntegerArray;
    import java.util.function.*;
    
    public class test7 {
    
        public static void main(String[] args) {
            /*
              -------------------测试普通数组-----------------------------------------------
              对普通的数组进行测试,从原理上将每个数组的元素应该都是10000,但是由于线程并不安全,所以
              出现问题。
             */
            demo(
                    ()->new int[10],
                    (array)->array.length,
                    (array,index)->array[index]++,
                    array->System.out.println(Arrays.toString(array))
            );
            /*
            ---------------------------测试原子数组--------------------------------------
               可以看到使用原子数组能够保证每个变量累加的值都是10000,保证原子数组内部变量的安全性。
             */
            demo(
                    ()->new AtomicIntegerArray(10),
                    AtomicIntegerArray::length,
                    AtomicIntegerArray::getAndIncrement,
                    System.out::println
                    );
    
        }
    
        /**
         参数1,提供数组、可以是线程不安全数组或线程安全数组
         参数2,获取数组长度的方法
         参数3,自增方法,回传 array, index
         参数4,打印数组的方法
         */
        // supplier 提供者 无中生有 ()->结果
        // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
        // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
        /* 这里采用函数式接口,是为了兼容普通数组以及原子引用数组*/
        private static <T> void demo(
                Supplier<T> arraySupplier,
                Function<T, Integer> lengthFun,
                BiConsumer<T, Integer> putConsumer,
                Consumer<T> printConsumer ) {
                List<Thread> ts = new ArrayList<>();
                T array = arraySupplier.get();            // 01 利用实现的函数接口创建数组
                int length = lengthFun.apply(array);      // 02 利用实现函数接口获取数组的长度
                for (int i = 0; i < length; i++) {
                // 每个线程对数组作 10000 次操作
                    ts.add(new Thread(() -> {
                        for (int j = 0; j < 10000; j++) {
                            putConsumer.accept(array, j%length); // 03 利用实现的函数接口获取对修改数组的元素
                        }
                    }));
                }
            ts.forEach(t -> t.start()); // 启动所有线程
            ts.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }); // 等所有线程结束
            printConsumer.accept(array);                 //  04 利用实现的函数接口打印数组的元素。
        }
    }
    
    

    执行结果

    [9684, 9741, 9688, 9616, 9608, 9614, 9613, 9605, 9602, 9591]
    [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
    

    2-5 字段更新器(对类的一部分成员提供线程安全保护)

    应用场景:针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现
    异常

    /*提供了三种类型的字段更新器*/
    AtomicReferenceFieldUpdater 
    AtomicIntegerFieldUpdater
    AtomicLongFieldUpdater
    

    使用实例

    package chapter6;
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    public class test8 {
        private volatile int field;
        public static void main(String[] args) {
                // 通过定义原子更新器为这个类别添加field字段
                AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(test8.class, "field");
                test8 test = new test8();
                fieldUpdater.compareAndSet(test, 0, 10);
                // 修改成功 field = 10
                System.out.println(test.field);
                // 修改成功 field = 20
                fieldUpdater.compareAndSet(test, 10, 20);
                System.out.println(test.field);
                // 修改失败 field = 20,expect是10,与field存储的20并不一致。
                // 只有expect与field存储的值一致才能更新成功
                fieldUpdater.compareAndSet(test, 10, 30);
                System.out.println(test.field);
        }
    }
    
    

    执行结果

    10
    20
    20
    

    2-6 原子累加器性能的测试

    比较AtomicLong与LongAdder

    package chapter6;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.LongAdder;
    import java.util.function.*;
    
    public class test9 {
        public static void main(String[] args) {
            // 测试2种类型的累加的效率测试
            /*利用了函数接口测试,分别测试5次*/
            System.out.printf("Atomic LongAdder Test!!!
    ");
            for (int i = 0; i < 5; i++) {
                demo(() -> new LongAdder(), adder -> adder.increment());
            }
            System.out.printf("--------------------------------------
    ");
            System.out.printf("Atomic Long Test!!! 
    ");
            for (int i = 0; i < 5; i++) {
                demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
            }
            System.out.printf("--------------------------------------
    ");
    
    
        }
    
        private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
            T adder = adderSupplier.get();
            long start = System.nanoTime();
            List<Thread> ts = new ArrayList<>();
    
            // 4 个线程,每人累加 50 万,总结应为200万
            for (int i = 0; i < 40; i++) {
                ts.add(new Thread(() -> {
                    for (int j = 0; j < 500000; j++) {
                        action.accept(adder);
                    }
                }));
            }
            ts.forEach(t -> t.start());
            ts.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long end = System.nanoTime();
            System.out.println(adder + " cost:" + (end - start)/1000_000);
        }
    }
    

    执行结果

    • 可以看到无论是LongAdder()与AtomicLong都能确保变量累加的原子性。
    • LongAddr累加的效率要比AtomicLong的效率要高
    Atomic LongAdder Test!!!
    20000000 cost:132
    20000000 cost:123
    20000000 cost:106
    20000000 cost:118
    20000000 cost:126
    --------------------------------------
    Atomic Long Test!!!
    20000000 cost:558
    20000000 cost:623
    20000000 cost:618
    20000000 cost:574
    20000000 cost:669
    --------------------------------------
    
    为什么LongAddr累加的效率要比AtomicLong的效率要高?

    原因:LongAddr在多个线程竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加
    Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性
    能。 (将多个线程并行累加,最后汇总结果

    三 LongAdder的实现原理

    3-1 内部关键域

    // 累加单元, 懒惰初始化,所谓懒惰加载就是通过当有多个线程竞争时,才会初始化多个Cell变量。
    transient volatile Cell[] cells;
    // 基础值, 如果没有竞争, 则用 cas 累加这个域
    transient volatile long base;
    // 在 cells 创建或扩容时, 置为 1, 表示加锁
    transient volatile int cellsBusy;
    
    • 上面的三个变量都使用volatile保证可见性。

    3-2 利用CAS机制如何实现加锁(cellsBusy的底层原理)?

    • 注意这种加锁方式不能用于生产实践
    package chapter6;
    import lombok.extern.slf4j.Slf4j;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j(topic = "c.test10")
    public class test10 {
        // 设置锁的标志为: 1表示加锁,0表示解锁
        // 这里实际上自定义了一个CAS机制
        private AtomicInteger state = new AtomicInteger(0);
        // 线程调用这个方法会不断使用CAS操作进行比较,直到操作成功。
        // 这样写存在风险,线程会不断循环。会进行空运算。
        public void lock(){
            while(true){
                if(state.compareAndSet(0,1)){
                    break;
                }
            }
        }
        public void unlock(){
            log.warn("unlock...");
            state.set(0);
        }
        public static void main(String[] args) {
            test10 lock = new test10();
            new Thread(() -> {
                log.warn("begin");
                lock.lock();
                log.warn("lock");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }, "t1").start();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(() -> {
                log.warn("begin");
                lock.lock();
                try{
                    log.warn("lock");
    
                }finally {
                    lock.unlock();
                }
            }, "t2").start();
        }
    }
    

    执行结果

    [t1] WARN c.test10 - begin
    [t1] WARN c.test10 - lock
    [t2] WARN c.test10 - begin
    [t1] WARN c.test10 - unlock...
    [t2] WARN c.test10 - lock
    [t2] WARN c.test10 - unlock...
    

    结果说明:线程t1与线程t2都需要执行先加锁后解锁的程序。

    核心:state.compareAndSet(0,1)

    • 线程t1加锁成功后,通过CAS操作将实际值设为1,然后睡眠。
    • 线程t2启动后,尝试设置变量,但由于实际值是1不符合预期,所以不停循环。

    总结:通过CAS机制可以实现加锁保护变量的效果,确保当前的共享资源只有一个线程访问

    • 下面的cellsBusy就是利用CAS机制对累加单元cells实现加锁
    // 累加单元, 懒惰初始化,所谓懒惰加载就是通过当有多个线程竞争时,才会初始化多个Cell变量。
    transient volatile Cell[] cells;
    // 在 cells 创建或扩容时, 置为 1, 表示加锁
    transient volatile int cellsBusy;
    

    3-3 累加单元的伪共享问题的解决

    累加单元类的源码分析
    // 防止缓存行伪共享
        @sun.misc.Contended
        static final class Cell {
            volatile long value;
            Cell(long x) { value = x; 
        }
        
        // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
        final boolean cas(long prev, long next) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
        }
        // 省略不重要代码
      }
    
    CPU读取各层次存储的时间?

    CPU读取信息从 时钟周期
    寄存器 1 cycle (4GHz 的 CPU 约为0.25ns)
    L1 cache 3~4 cycle
    L2 cache 10-20 cycle
    L3 cache 40-45 cycle
    内存 120-240 cycle

    缓存的优势

    • 需要靠预读数据至缓存来提升效率。而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)

    缓存带来的问题

    • 缓存会带来数据副本的问题,每个CPU核心都有一个缓存,同一份数据在位于不同CPU核心的缓存需要考虑数据一致性问题
    • 如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
    cell单元与缓存的关系实例

    上图中有2个cell单元,2个线程共同读取这个cell单元。

    cell单元的大小: Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value,64位虚拟机)

    缓存行大小:64字节,可以存储2个cell对象。

    什么是缓存行伪共享?

    2个cell对象存储在一个缓存行带来的问题(也称之为缓存单元的伪共享):任意一个cell对象的修改会造成2个cell对象都失效,增加了内存读取的开销(由于2个对象在同一缓存行)。

    解决方法:将cell对象放在不同的缓存行,避免一个cell对象的失效影响另外一个cell对象。

    • @sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

    • @sun.misc.Contended注解避免一个缓存行容纳多个缓存行。

    3-4 LongAdder的add方法源码分析

    public class LongAdder extends Striped64 implements Serializable {
        private static final long serialVersionUID = 7249069246863182397L;
        public LongAdder() {
        }
        public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            /*先判断cells数组是否为空,为空则对尝试对base对量进行累加*/
            /*----------------
               第一条线:
               cells数组为空(表明之前没有发生竞争)
               casBase(b = base, b + x):对base进行CAS累加,如果累加成功返回true否则返回false
               调用longAccumulate函数
               ----------------
               第二条线:
               cells数组不为空(表明之前发生过竞争)
               a = as[getProbe() & m]) == null:当前的线程有没有对应的cell单元
               ---没有,执行longAccumulate函数
               ---有,(uncontended = a.cas(v = a.value, v + x):对累加单元进行CAS累加,如果失败依旧调用
                                                        执行longAccumulate函数
            */
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                boolean uncontended = true;
                if ( as == null || 
                    (m = as.length - 1) < 0 ||
           (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))
                   )
                    longAccumulate(x, null, uncontended);
            }
        }
    

    3-5 LongAdder的longAccumulate 方法源码分析


    JDK8的源码如下

    什么时候会调用该方法?

    • 累加单元或者累加单元数组没有创建会调用这个函数。

    子函数的说明

    casCellsBusy(): 对累加数组进行加锁,c将cellBusy由0变为1,实现该线程对cells数组的加锁
    cellSABusy = 0: 对累加数组进行解锁。
    

    longAccumlate源码

     final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) {
            int h;
            if ((h = getProbe()) == 0) {
                ThreadLocalRandom.current(); // force initialization
                h = getProbe();
                wasUncontended = true;
            }
            boolean collide = false;                // True if last slot nonempty
            for (;;) {
                Cell[] as; Cell a; int n; long v;
                // 当累加单元数组不为空,且长度大于0时
                if ((as = cells) != null && (n = as.length) > 0) {
                    if ((a = as[(n - 1) & h]) == null) {
                        if (cellsBusy == 0) {       // Try to attach new Cell
                            /*创建cell对象*/
                            Cell r = new Cell(x);   // Optimistically create
                            /*
                             cellsBusy=0表示这个cell数组没有上锁,
                             通过casCellsBusy()将cellsBusy由0变为1,
                             从而进行加锁。 
                             */
                            if (cellsBusy == 0 && casCellsBusy()) {
                                boolean created = false;
                                /* rs[j = (m - 1) & h] == null
                                   检查cell对应位置的cell对象是否为空,如果不为空,将创建的cell对象引用                                放入到该位置。
                                */
                                try {               // Recheck under lock
                                    Cell[] rs; int m, j;
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    cellsBusy = 0;
                                }
                                if (created)         // 创建成功,则break
                                    break;
                                continue;           // Slot is now non-empty
                            }
                        }
                        collide = false;
                    }
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
                    /*对cell的累加单元进行累加,累加成功则直接break*/
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x))))
                        break;
                    /*累加失败,当累加单元的数目大于CPU的核心数的时候,通过设置collide为fasle避免
                      累加数组的继续扩容。
                    */
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale
                    else if (!collide)
                        collide = true;
                    /*加锁,*/
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            /*对累加数组进行扩容,容量是乘以2增长*/
                            if (cells == as) {      // Expand table unless stale
                                Cell[] rs = new Cell[n << 1]; 
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    h = advanceProbe(h);
                }
    /*=================================================================================*/     
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {                           // Initialize table
                        if (cells == as) {
                            Cell[] rs = new Cell[2];  // 创建累加数组,只会给当前线程创建累加单元
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }
    

    3-6 LongAdder的sum方法源码分析

        /**
         * Returns the current sum.  The returned value is <em>NOT</em> an
         * atomic snapshot; invocation in the absence of concurrent
         * updates returns an accurate result, but concurrent updates that
         * occur while the sum is being calculated might not be
         * incorporated.
         *
         * @return the sum
         */
        public long sum() {
            Cell[] as = cells; Cell a;
            long sum = base;
            if (as != null) {     // 累加单元数组不为空
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    四 Unsafe对象的简单分析

    4-1 unsafe对象概述

    • 提供了非常底层的,操作内存、线程的方法
    • Unsafe 对象不能直接调用,只能通过反射获得
    package sun.misc;
    import java.lang.reflect.Field;
    import java.lang.reflect.Modifier;
    import java.security.ProtectionDomain;
    import sun.reflect.CallerSensitive;
    import sun.reflect.Reflection;
    // 可以看到theUnsafe是私有的,因此只能通过反射获得。
    public final class Unsafe {
        private static final Unsafe theUnsafe;
        public static final int INVALID_FIELD_OFFSET = -1;
        public static final int ARRAY_BOOLEAN_BASE_OFFSET;
        public static final int ARRAY_BYTE_BASE_OFFSET;
    
    • CAS以及park/unpark方法底层都是调用的unsafe对象
    /*AtomicInteger成员对象就有Unsafe对象。*/
    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
        /*setup to use Unsafe.compareAndSwapInt for updates*/
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long valueOffset;
    ......
    /*
     LockSupport.java的park/unpark中
    */   
    public static void park() {
        UNSAFE.park(false, 0L);
    }
        
    public static void unpark(Thread thread) {
        if (thread != null)
        	UNSAFE.unpark(thread);
    }
    

    通过反射获取Unsafe对象

    package chapter6;
    import sun.misc.Unsafe;
    import java.lang.reflect.Field;
    public class testUnsafe {
        public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
            /*
                public final class Unsafe { private static final Unsafe theUnsafe;
             */
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);                    // 允许访问私有变量
            Unsafe unsafe = (Unsafe) theUnsafe.get(null);     // 由于是静态成员,所以访问无需传入实例
            System.out.println(unsafe);
        }
    }
    
    

    运行结果

    sun.misc.Unsafe@135fbaa4
    

    4-2 利用unsafe对象实现对普通对象的CAS成员保护

    代码

    package chapter6;
    import lombok.Data;
    import sun.misc.Unsafe;
    import java.lang.reflect.Field;
    public class testUnsafe {
    
        public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
            /*
                public final class Unsafe { private static final Unsafe theUnsafe;
             */
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);                    // 允许访问私有变量
            Unsafe unsafe = (Unsafe) theUnsafe.get(null);     // 由于是静态成员,所以访问无需传入实例
    
            // step1:获取class的域的偏移地址
            long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
            long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
            Teacher t = new Teacher(-1,null);
            // step2:利用Unsafe对象提供的CAS方法对对象的成员进行修改
            // CAS操作,如果修改成功则返回true.
            unsafe.compareAndSwapInt(t, idOffset, -1, 1); // 返回 true
            unsafe.compareAndSwapObject(t, nameOffset, null, "张三"); // 返回 true
            System.out.println(t);
        }
    }
    
    @Data
    class Teacher{
        volatile int id;
        volatile String name;
        Teacher(int id,String name){
            this.id  = id;
            this.name = name;
        }
    }
    
    

    4-3 利用unsafe对象实现MyAtomicInteger

    代码实现

    package chapter6;
    import sun.misc.Unsafe;
    import java.lang.reflect.Field;
    import java.util.ArrayList;
    import java.util.List;
    class UnsafeAccessor {
        static Unsafe unsafe;
        static {
            Field theUnsafe = null;
            try {
                theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
                theUnsafe.setAccessible(true);
                unsafe = (Unsafe) theUnsafe.get(null);
            } catch (NoSuchFieldException | IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    
        static Unsafe getUnsafe() {
            return unsafe;
        }
    }
    
    // account接口,用于测试安全性
    interface Account5{
        Integer getBalance();
        void withdraw(Integer amount);
        static void demo(Account5 account){
            List<Thread> ts = new ArrayList<>();
            long start = System.nanoTime();
            for (int i = 0; i < 1000; i++) {
                ts.add(new Thread(() -> {
                    account.withdraw(10);
                }));
            }
            ts.forEach(Thread::start);
            ts.forEach(t -> {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long end = System.nanoTime();
            System.out.println(account.getBalance()
                    + " cost: " + (end-start)/1000_000 + " ms");
        }
    }
    
    class myAtomicInteger implements Account5{
        private volatile int value;       /*定义所保护的域,注意CAS操作配合volatile使用*/
        static final Unsafe unsafe;
        static final long DATA_OFFSET;
        static {
            unsafe = UnsafeAccessor.getUnsafe();
            try {
                 /*获取保护的区域的偏移地址*/
                DATA_OFFSET = unsafe.objectFieldOffset(myAtomicInteger.class.getDeclaredField("value"));
            } catch (NoSuchFieldException e) {
                throw new Error(e);
            }
        }
        public myAtomicInteger(int data) {
            this.value = data;
        }
        public void decrease(int amount) {
            int oldValue;
            while(true) {
                 // 获取共享变量旧值
                oldValue = value;
                 // cas 尝试修改 value 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
                if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) 			  {
                    return;
                }
            }
        }
        public int getValue() {
            return this.value;
        }
    
        @Override
        public Integer getBalance() {
            return getValue();
        }
    
        @Override
        public void withdraw(Integer amount) {
            decrease(amount);
        }
    }
    
    class test20 {
        public static void main(String[] args) {
            // 测试自己实现的原子Integer
            Account5.demo((Account5) new myAtomicInteger(10000));
        }
    }
    
    • 使用Unsafe对象提供的CAS机制保护对象的成员。
    /*这种写法与之间的CAS逻辑上是一致的
    --预期值与实际值一致,才进行更新。
    */
    
    while(true) {
         // 获取共享变量旧值
         oldValue = value;
         // cas 尝试修改 value 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
         if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) 			  {
        	 return;
         }
    }
    
    执行结果
    0 cost: 218 ms
    

    五 知识点总结

    (1)CAS 与 volatile的作用与应用(通常CAS与volatile配合使用)
    (2)基于CAS机制的API的使用

    • 原子整数

    • 原子引用

    • 原子数组

    • 字段更新器

    • 原子累加器

    • Unsafe

    (3)原理方面掌握

    • LongAdder 源码分析
    • 伪共享的定义与解决方法( 3.3 @sun.misc.Contended注解的作用)

    参考资料

    多线程基础课程

  • 相关阅读:
    day34-python-异常
    day33-python-反射、内置方法、元类、 属性查找
    day31/32待补
    day30-mixin、重载、多态、绑定与非绑定方法、内置函数
    day29-python-property、继承、属性查找、多继承带来的菱形问题
    day28-python-封装
    day27-python-面向对象介绍、实现面向对象编程、查找顺序
    luffy项目
    oc基本数据类型
    iOS 入门了解
  • 原文地址:https://www.cnblogs.com/kfcuj/p/14590849.html
Copyright © 2011-2022 走看看