zoukankan      html  css  js  c++  java
  • 并发与高并发(七)-线程安全性-原子性-atomic

    前言

    何为原子性?它又是通过什么原理来控制线程安全的?这里主要介绍有关Atomic原子性操作的几个类的使用场景和方法。

    主体概要

    • AtomicInteger

    • AtomicLong

    • LongAdder

    • AtomicReference

    • AtomicIntegerFieldUpdater

    • AtomicStampedReference

    • AtomicBoolean

    主体内容

    一、线程安全性定义

    定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

    线程安全性主要体现在三个方面:原子性、可见性、有序性:

    • 原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作
    • 可见性:一个线程对主内存的修改可以及时地被其他线程观察到
    • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

    使用示例:

    1.AtomicInteger

    package com.controller.atomic;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import com.annoations.ThreadSafe;
    
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    @ThreadSafe
    public class AtomicIntegerTest {
        //请求数
        public static int clientTotal=5000;
        //并发数
        public static int threadTotal=200;
        //计数值
        public static AtomicInteger count= new AtomicInteger(0);
        
        public static void main(String[] args) throws InterruptedException{
            //创建线程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量(允许并发数)
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for(int i =0;i<clientTotal;i++){
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count.get());
        }
        
        private static void add(){
            //先增加操作,再获取当前的值
            count.incrementAndGet();
            //先获取当前的值,在增加操作
            //count.getAndIncrement();
        }
    }

    重点在于.incrementAndGet()方法中的unsafe.getAndAddInt()方法,如下所示:

    /**
         * Atomically increments by one the current value.
         *
         * @return the updated value
         */
        public final int incrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
        }

    使用了一个叫unsafe的类,看它里面getAndAddInt的实现,标红的compareAndSwapInt方法需要注意,首先这个“paramObject”是指当前调用的对象,即上面例子中的count对象,而paramLong是当前的值,比如这里想执行2+1=3的操作,paramLong就等于2,paramInt就等于1。i是从调用这个getIntVolatile方法得到底层当前的值。那么compareAndSwapInt(paramObject, paramLong, i, i + paramInt))中变量值可以解释为compareAndSwapInt(count,2, 2, 2 +1)),如果paramLong和底层的i值相同的话,将其值更新为i+paramInt。也就是期望的值与底层值相等,才会执行+1操作,最后把底层的值覆盖掉,这就是CAS的核心。

     public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
       {
        int i;
         do
        {
           i = getIntVolatile(paramObject, paramLong);
         } while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
         return i;
       }

    2.AtomicLong

    package com.controller.atomic;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    
    import com.annoations.ThreadSafe;
    
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    @ThreadSafe
    public class AtomicLongTest {
        //请求数
        public static int clientTotal=5000;
        //并发数
        public static int threadTotal=200;
        //计数值
        public static AtomicLong count= new AtomicLong(0);
        
        public static void main(String[] args) throws InterruptedException{
            //创建线程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量(允许并发数)
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for(int i =0;i<clientTotal;i++){
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count.get());
        }
        
        private static void add(){
            //先增加操作,再获取当前的值
            count.incrementAndGet();
            //先获取当前的值,在增加操作
            //count.getAndIncrement();
        }
    }

    3.LongAdder

    package com.controller.atomic;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.LongAdder;
    
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    public class LongAddrTest {
        //请求数
            public static int clientTotal=5000;
            //并发数
            public static int threadTotal=200;
            //计数值
            public static LongAdder count= new LongAdder();
            
            public static void main(String[] args) throws InterruptedException{
                //创建线程池
                ExecutorService executorService = Executors.newCachedThreadPool();
                //定义信号量(允许并发数)
                final Semaphore semaphore = new Semaphore(threadTotal);
                //定义计数器
                final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
                for(int i =0;i<clientTotal;i++){
                    executorService.execute(()->{
                        try {
                            semaphore.acquire();
                            add();
                            semaphore.release();
                        } catch (Exception e) {
                            log.error("exception",e);
                        }
                        countDownLatch.countDown();
                    });
                }
                countDownLatch.await();
                executorService.shutdown();
                log.info("count:{}",count);
            }
            
            private static void add(){
                //先增加操作,再获取当前的值
                count.increment();
                //先获取当前的值,在增加操作
                //count.getAndIncrement();
            }
    }

    LongAdder 与 AtomicLong有什么区别?

    (1)AtomicLong 是基于 CAS 方式自旋更新的;LongAdder 是把 value 分成若干cell,并发量低的时候,直接 CAS 更新值,成功即结束。并发量高的情况,CAS更新某个cell值和需要时对cell数据扩容,成功结束;更新失败自旋 CAS 更新 cell值。取值的时候,调用 sum() 方法进行每个cell累加。

    (2)AtomicLong 包含有原子性的读、写结合的api;LongAdder 没有原子性的读、写结合的api,能保证结果最终一致性。
    低并发场景AtomicLong 和 LongAdder 性能相似,高并发场景 LongAdder 性能优于 AtomicLong。

    4.AtomicReference

    package com.controller.atomic;
    import java.util.concurrent.atomic.AtomicReference;
    import com.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    @ThreadSafe
    public class AtomicReferenceTest {
        private static AtomicReference<Integer> count = new AtomicReference<>(0);
        
        public static void main(String[] args){
            //如果值为0,则结果为2
            count.compareAndSet(0, 2);//2
            //如果值为0,则结果为1
            count.compareAndSet(0, 1);//no
            //如果值为1,则结果为3
            count.compareAndSet(1, 3);//no
            //如果值为2,则结果为4
            count.compareAndSet(2, 4);//4
            //如果值为3,则结果为5
            count.compareAndSet(3, 5);//no
            log.info("count:{}",count.get());
        }
    }

    5.AtomicIntegerFieldUpdater

    package com.controller.atomic;
    
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    public class AtomicIntegerFileUpdaterTest {
        
        private static AtomicIntegerFieldUpdater<AtomicIntegerFileUpdaterTest> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFileUpdaterTest.class, "count");
        @Getter
        public volatile int count=100;
        
        public static void main(String[] args){
            AtomicIntegerFileUpdaterTest AtomicIntegerFileUpdaterTest = new AtomicIntegerFileUpdaterTest();
            
            if(updater.compareAndSet(AtomicIntegerFileUpdaterTest, 100, 120)){
                log.info("update success,{}",AtomicIntegerFileUpdaterTest.getCount());
            }
            if(updater.compareAndSet(AtomicIntegerFileUpdaterTest, 100, 120)){
                log.info("update success,{}",AtomicIntegerFileUpdaterTest.getCount());
            }else{
                log.info("update failed,{}",AtomicIntegerFileUpdaterTest.getCount());
            }
        }
    }

    加粗部分指代的是当前类"AtomicIntegerFileUpdaterTest"下的“count”变量,而count必须被volatile关键字修饰,不能被static修饰。

    程序执行的结果为:

    update success,120
    update failed,120

    compareAndSet(param1,param2,param3)指代的是如果param1中被AtomicIntegerFieldUpdater定义的对象的变量等于param2,则将其更新为param3。否则不更新。

    6.AtomicStampedReference

    解决CAS的ABA问题,每次增加操作,在原有版本号基础上加一。

    关于ABA问题,这里作一个详细介绍:

    在多线程场景下CAS会出现ABA问题,关于ABA问题这里简单科普下,例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下
    1.线程1,期望值为A,欲更新的值为B
    2.线程2,期望值为A,欲更新的值为B
    线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了,线程1取值与期望的A值比较,发现相等然后将值更新为B,然后这个时候出现了线程3,期望值为B,欲更新的值为A,线程3取值与期望的值B比较,发现相等则将值更新为A,此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程2也完成了操作,但是线程2并不知道值已经经过了A->B->A的变化过程。
    
    ABA问题带来的危害:
    
    小明在提款机,提取了50元,因为提款机问题,有两个线程,同时把余额从100变为50
    线程1(提款机):获取当前值100,期望更新为50,
    线程2(提款机):获取当前值100,期望更新为50,
    线程1成功执行,线程2某种原因block了,这时,某人给小明汇款50
    线程3(默认):获取当前值50,期望更新为100,
    这时候线程3成功执行,余额变为100,
    线程2从Block中恢复,获取到的也是100,compare之后,继续更新余额为50!!!
    此时可以看到,实际余额应该为100(100-50+50),但是实际上变为了50(100-50+50-50)这就是ABA问题带来的成功提交。
    解决方法: 在变量前面加上版本号,每次变量更新的时候变量的版本号都+1,即A->B->A就变成了1A->2B->3A。

    其中的compareAndSet()方法如下:

     public boolean compareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
            Pair<V> current = pair;
            return
                expectedReference == current.reference &&
                expectedStamp == current.stamp &&
                ((newReference == current.reference &&
                  newStamp == current.stamp) ||
                 casPair(current, Pair.of(newReference, newStamp)));
        }

    7.AtomicBoolean

    package com.controller.atomic;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class AtomicBooleanTest {
        private static AtomicBoolean isHappend = new AtomicBoolean(false);
        //请求数
        public static int clientTotal=5000;
        //并发数
        public static int threadTotal=200;
        
        public static void main(String[] args) throws Exception{
            //创建线程池
                    ExecutorService executorService = Executors.newCachedThreadPool();
                    //定义信号量(允许并发数)
                    final Semaphore semaphore = new Semaphore(threadTotal);
                    //定义计数器
                    final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
                    for(int i =0;i<clientTotal;i++){
                        executorService.execute(()->{
                            try {
                                semaphore.acquire();
                                test();
                                semaphore.release();
                            } catch (Exception e) {
                                log.error("exception",e);
                            }
                            countDownLatch.countDown();
                        });
                    }
                    countDownLatch.await();
                    executorService.shutdown();
                    log.info("isHappend:{}",isHappend.get());
        }
        public static void test(){
            if(isHappend.compareAndSet(false, true)){
                log.info("execute..");
            }
        }
    }

    结果:true

    这个类的示例用于在并发情况下,想只执行一次某段逻辑,绝对不会重复。

  • 相关阅读:
    Coding 初级教程(一)——用GitHub的GUI客户端对Coding的项目进行管理
    自己以前写的日记,现在公开(别的文章需要用)1
    7-网页,网站,微信公众号基础入门(微信配网_申请微信公众号)
    关于Keil4 转到 Keil5以后的一些错误解决
    6-网页,网站,微信公众号基础入门(PHP学习_1)
    5-网页,网站,微信公众号基础入门(配置网站--PHP配置上数据库)
    4-网页,网站,微信公众号基础入门(配置网站--下载安装PHP)
    3-网页,网站,微信公众号基础入门(学习网页_2)
    7-STM32物联网开发WIFI(ESP8266)+GPRS(Air202)系统方案安全篇(GPRS模块SSL连接MQTT)
    6-STM32物联网开发WIFI(ESP8266)+GPRS(Air202)系统方案安全篇(Wi-Fi模块SSL连接MQTT)
  • 原文地址:https://www.cnblogs.com/xusp/p/11875082.html
Copyright © 2011-2022 走看看