zoukankan      html  css  js  c++  java
  • JAVA并发基础

    CPU多级缓存:

     

     

     

     

     

    read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用

    load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放到工作内存中的变量副本中

    write(写入):作用于主内存的变量,他把store操作从工作内存中一个变量的值,传送到主内存的变量中

    不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到内存中

     

     

    如果对一个变脸执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始变量的值。

     

     

     可以阻塞线程,并保证线程在满足特定的条件下继续执行,线程执行完成之后,在进行其他的处理

    阻塞进程,并同一时间控制请求的并发量,控制同时的并发数

    添加几个注解:

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.Target;
    import  java.lang.annotation.RetentionPolicy;
    
    //用来标记【不推荐】的类或者写法
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.SOURCE)
    public @interface NotRecommend {
        String value() default "";
    }
    

      

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.Target;
    import  java.lang.annotation.RetentionPolicy;
    
    //用来标记【线程安全】的类或者写法
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.SOURCE)
    
    public @interface NotThreadSafe {
        String value() default "";
    }
    

      

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.Target;
    import  java.lang.annotation.RetentionPolicy;
    
    //用来标记【推荐】的类或者写法
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.SOURCE)
    public @interface Recommend {
    }
    

      

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.Target;
    import  java.lang.annotation.RetentionPolicy;
    
    //用来标记【线程安全】的类或者写法
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.SOURCE)
    public @interface ThreadSafe {
        String value() default "";
    }
    

      线程不安全:

    import com.example.annoations.NotThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    @Slf4j
    @NotThreadSafe
    public class ConcurrencyTest {
        //请求总数
        public static int clientTotal=5000;
        //同时并发执行的线程数
        public static int   threadTotal=200;
        public static  int count=0;
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                executorService.execute(()->{
                    try{
                        semaphore.acquire();  //是否允许被执行
                        add();
                        semaphore.release();  //释放信号量
                    }catch(Exception e){
                      log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count);
        }
        private static void add(){
            count++;
        }
    }
    

      线程安全性:

     

    import com.example.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    @ThreadSafe
    public class CountExample2 {
        //请求总数
        public static int clientTotal=5000;
        //同时并发执行的线程数
        public static int   threadTotal=200;
        public static AtomicInteger  count= new AtomicInteger(0);
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                executorService.execute(()->{
                    try{
                        semaphore.acquire();  //是否允许被执行
                        add();
                        semaphore.release();  //释放信号量
                    }catch(Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count.get());
        }
        private static void add(){
            count.incrementAndGet();
        }
    }

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    import com.example.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample2 {
        //请求总数
        public static int clientTotal=5000;
        //同时并发执行的线程数
        public static int   threadTotal=200;
        public static AtomicLong count= new AtomicLong(0);
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                executorService.execute(()->{
                    try{
                        semaphore.acquire();  //是否允许被执行
                        add();
                        semaphore.release();  //释放信号量
                    }catch(Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count.get());
        }
        private static void add(){
            count.incrementAndGet();
        }
    }
    

      

    import com.example.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.LongAdder;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample6 {
        //请求总数
        public static int clientTotal=5000;
        //同时并发执行的线程数
        public static int   threadTotal=200;
        public static LongAdder count= new LongAdder();
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                executorService.execute(()->{
                    try{
                        semaphore.acquire();  //是否允许被执行
                        add();
                        semaphore.release();  //释放信号量
                    }catch(Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count);
        }
        private static void add(){
            count.increment();
        }
    }

     

    import com.example.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.atomic.AtomicReference;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample7 {
        private static AtomicReference<Integer> count=new AtomicReference<>(0);
    
        public static void main(String[] args) {
            count.compareAndSet(0,2);
            count.compareAndSet(0,1);
            count.compareAndSet(1,3);
            count.compareAndSet(2,4);
            count.compareAndSet(3,5);
            log.info("count:{}",count.get());
        }
    }
    

      

    import com.example.annoations.ThreadSafe;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample8 {
        //更新指定的类的某个字段的值,字段用volatile 非static 描述的字段
        private static AtomicIntegerFieldUpdater<AtomicExample8> updater=
                AtomicIntegerFieldUpdater.newUpdater(AtomicExample8.class,"count");
        @Getter
        public volatile int count=100;
    
        public static void main(String[] args) {
             AtomicExample8 example8=new AtomicExample8();
            if(updater.compareAndSet(example8,100,200)){
                log.info("update success 1,{}",example8.getCount());
            }
            if(updater.compareAndSet(example8,100,200)){
                log.info("update success 2,{}",example8.getCount());
            }
            else{
                log.info("update failed,{}",example8.getCount());
            }
        }
    }

    import com.example.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample9 {
        private static AtomicBoolean isHappened=new AtomicBoolean(false);//默认是是否发生
    
        //请求总数
        public static int clientTotal=5000;
        //同时并发执行的线程数
        public static int   threadTotal=200;
    
        public static void main(String[] args) throws Exception{
            ExecutorService executorService= Executors.newCachedThreadPool();
            final Semaphore semaphore=new Semaphore(threadTotal);
            final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
            for(int i=0;i<clientTotal;i++){
                executorService.execute(()->{
                    try{
                        semaphore.acquire();  //是否允许被执行
                        test();
                        semaphore.release();  //释放信号量
                    }catch(Exception e){
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("isHappened:{}",isHappened.get());
        }
        private static void test(){
            if(isHappened.compareAndSet(false,true));
            log.info("execute");
        }
    

      

    同步锁。

    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class SynchronizedExample1 {
        //修饰一个代码块,不同调用对象之间是互相不影响的,
        public void test1(int j){
            synchronized (this){
                for (int i=0;i<10;i++){
                    log.info("test1{}-{}",j,i);
                }
            }
        }
        //修饰一个方法  不同调用对象之间是互相不影响的,不属于方法声明的一部分
        public synchronized void test2(int j){
            for (int i=0;i<10;i++){
                log.info("test2{}-{}",j,i);
            }
        }
    
        public static void main(String[] args) {
            SynchronizedExample1 example1=new SynchronizedExample1();
            SynchronizedExample1 example2=new SynchronizedExample1();
            ExecutorService excutorService= Executors.newCachedThreadPool();
            excutorService.execute(()->{
                example1.test2(1);
            });
    
            excutorService.execute(()->{
                example2.test2(1);
            });
        }
    }
    
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    @Slf4j
    public class SynchronizedExample2 {
        //修饰一个类
        public void test1(int j){
            synchronized (this){
                for (int i=0;i<10;i++){
                    log.info("test1{}-{}",j,i);
                }
            }
        }
        //修改一个静态方法
        public static synchronized void test2(int j){
            for (int i=0;i<10;i++){
                log.info("test2{}-{}",j,i);
            }
        }
    
        public static void main(String[] args) {
            SynchronizedExample1 example1=new SynchronizedExample1();
            SynchronizedExample1 example2=new SynchronizedExample1();
            ExecutorService excutorService= Executors.newCachedThreadPool();
            excutorService.execute(()->{
                example1.test2(1);
            });
    
            excutorService.execute(()->{
                example2.test2(1);
            });
        }
    }

     原子性对比:

     

     无法保证线程安全:(不具有原子性)

     

    适用场景:作为状态标识量

     

    线程安全性总结:

  • 相关阅读:
    mysql存储过程
    Mysql中的触发器
    快速开始、环境搭建、修改包名、新建模块、正式部署
    windows下redis下载安装
    Windows10下mysql 8.0.19 安装配置方法图文教程
    IDEA中安装SVN
    常见页面报错
    Python AttributeError: 'Module' object has no attribute 'STARTF_USESHOWINDOW'
    如何编写一篇高质量的技术博文?学习本文的排名靠前大法
    Linux use apktool problem包体变大GLIBC2.14等问题
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11221088.html
Copyright © 2011-2022 走看看