zoukankan      html  css  js  c++  java
  • JUC并发编程

    JDK并发包

    1.JUC简介

    在jdk1.5提供了java.util.concurrent包,简称JUC,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量任务框架;还提供了设计用于多线程上下文中的 Collection 实现等;

    2.概述

    从JDK官方文档可以看到JUC主要包括3部分组成

    • java.util.concurrent:包下主要是包括并发相关的接口与类,阻塞队列、线程池等,里面包含 59 个类或者接口

    • java.util.concurrent.atomic: 该包下主要包括原子性操作相关的类,比如常用的AtomicInteger、AtomicBoolean、AtomicIntegerArry等,里面包含18个类或者接口

    • java.util.concurrent.locks:该包下主要包括同步控制的锁相关的类,比如ReentrantLock、ReadWriteLock、ReentranReadWriteLock等,里面包含12个类或者接口

    image-20210624114532631

    3. 同步控制(java.util.concurrent.locks)

    同步控制是并发程序必不可少的重要手段,之前学习的synchronized关键字就是一种最简单的控制方法,他决定了一个线程是否可以访问临界区资源。同时object.wait()和object.notify()方法起到了线程的等待和通知的作用。

    下面具体聊聊JUC工具包的使用,首先进入java.util.concurrent.locks包下,主要包括3个接口和9个类

    image-20210624122325131

    1. Lock 接口

    Lock锁接口里面定义了6个方法,3个类实现了Lock接口,分别是可重入锁ReentrantLock、读锁ReadLock、写锁writeLock

    image-20210624140034337

    image-20210624140315249

    1. 可重入锁ReentrantLock

    jdk1.5中新增了ReetrantLock类,重入锁ReentrantLock可以完全替代synchronized关键字,并且在扩展功能上也更加强大,而且在使用上也更加灵活。

    重入锁使用java.util.concurrent.locks.ReentrantLock类来实现,ReentrantLock类的构造方法如下,ReentrantLock默认是非公平锁。

    image-20210624141447496

    从源码中我们也可以发现ReentrantLockb本身没有任何代码逻辑,实现都在其内部类Sync中。那么NonfairSync和FariSync又是什么呢?

    image-20210624144700920

    从源代码中发现类ReentrantLock中有3个内部类,sync、NonfairSync和FariSync,而NonfairSync和FairSync有继承了抽象类Sync,分别对应非公平锁和公平锁。

    image-20210624145231473

    image-20210624152849793 image-20210624152916949

    2.锁实现的基本原理:

    Sync的父类AbstractQueuedSynchrnized经常被称作队列同步器(AQS),该类的父类是AbstractOwnableSynchronizer。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

    • 需要一个state变量,标记锁的状态。state变量至少有两个值:0、1。对state变量的操作,需要确保线程安全,也就是会用到CAS

    • 需要记录当前是哪个线程持有锁

    • 需要底层支持对一个线程进行阻塞或唤醒操作

    • 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS

    3. 中断响应

    对于synchronized来说,如果一个线程在等待锁,那么结果就只有两种情况,要么获得锁继续执行,要么继续等待。而使用重入锁,则提供了另外一种可能,那就是线程可以被中断。如果一个线程在等待锁,那么它依然可以接收到一个通知,被告知无需再等待,可以停止工作了,这种情况对于处理死锁有一定帮助的。提供了lockInterruptibly()方法来实现。

    image-20210625154308286

    2. Condition接口

    如果理解了object.wait()和object.notify()方法的话,那么也就很容易理解condition对象了。wait、notify是和关键字synchronized合作使用的,而condition是和重入锁使用的。condition主要方法如下

    image-20210624150712348

    和Object.wait()和notify()方法一样,当线程使用Condition.await()时,要求线程持有相关的重入锁,在Condition.await调用后,这个线程会释放这把锁。同样调用Condition.signal()方法时,也需要线程获取到重入锁。

    public class ConditionDemo {
    ​
        public static void main(String[] args) {
    ​
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
    ​
            new Thread(()->{
                lock.lock();
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + "-->开始执行");
    ​
                    condition.await();//让出线程,等待主线程调用signalAll()唤醒
                    System.out.println(Thread.currentThread().getName() + "-->被唤醒又开始执行");
    ​
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            },"子线程").start();
    ​
            //主线程
            System.out.println(Thread.currentThread().getName() + "-->开始执行");
            try {
                TimeUnit.SECONDS.sleep(2);
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + "-->子线程执行await后,主线程执行");
                    condition.signalAll();
    ​
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    ​
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    3.ReadWriteLock接口

    ReadWriteLock是JDK5中提供的读写分离锁,读写分离锁可以有效的帮助减少锁竞争,以提升系统性能。ReadWriteLock接口中只有如下两个方法,分别是读锁readLock和writeLock写锁。

    image-20210624150745352

    读写锁允许多个线程同时读,但是考虑到数据的完整性,写写操作和读写操作之间依然是需要互相等待和持有锁的。总的来说,读写锁的访问约束如下

     
    非阻塞 阻塞
    阻塞 阻塞
      • 读-读不互斥:读读之间不阻塞

      • 读-写互斥:读阻塞写,写也会阻塞读

      • 写-写互斥:写写阻塞

    public class ReadWriteLockDemo {
    ​
        /**
         * 测试读写锁
         */
        public static void main(String[] args) {
            ReadWriteNumber readWriteNumber = new ReadWriteNumber();
      //读线程     
            for (int i = 0; i < 10; i++) {
                new  Thread(()->{
                    readWriteNumber.setNumber(new Random().nextInt(100));
                },"写线程"+String.valueOf(i)).start();
            }
    //写线程
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    readWriteNumber.getNumber();
                },"读线程"+ String.valueOf(i)).start();
            }
    ​
        }
    }
    ​
    class ReadWriteNumber{
        private volatile  int number = 0;
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    ​
        //写方法
        public void setNumber(int number)
        {
            readWriteLock.writeLock().lock();
            try {
                TimeUnit.SECONDS.sleep(1);
                this.number = number;
                System.out.println(Thread.currentThread().getName() + "-->写入后的数字是: " + this.number);
            } catch (Exception e)
            {
                e.printStackTrace();
            }   finally {
                readWriteLock.writeLock().unlock();
            }
        }
    ​
        //读方法
        public void getNumber()
        {
            readWriteLock.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "-->读取到的数字是:" +  number);
            } finally {
                readWriteLock.readLock().unlock();
            }
        }
    }

    4. 原子操作(java.util.concurrent.atomic)

     

    5.并发工具包(java.util.concurrent)

    1. Semaphore:允许多个线程同时访问

    无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量semaphore却可以指定多个线程,同时访问某一个资源。

    image-20210624174047354 image-20210624174121780

    /**
    * 模拟高并发抢车位,模拟车库里面有3个车位,现在有6个车子抢占车位.同一时刻最多只能有3辆车同时抢到车位,一个车出停车场后,另外一辆车才能进来
    *
    */
    public class SemaphoreDemo1 {
    ​
        public static void main(String[] args) {
    ​
            //只允许有3个线程能同时操作统一资源
            Semaphore semaphore = new Semaphore(3);
    ​
            //创建一个6条线程的线程池
    ​
            ThreadPoolExecutor threadPool =   new ThreadPoolExecutor(3,6,3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<Runnable>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
    ​
            for (int i = 1; i <= 6; i++) {
                
                threadPool.execute(() -> {
                    try {
                        int ParkTime = new Random().nextInt(10);
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + "抢到了车位");
                        TimeUnit.SECONDS.sleep(ParkTime);
                        System.out.println(Thread.currentThread().getName() + "停车" + ParkTime + "秒后离开了停车场");
    ​
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                });
            }
            threadPool.shutdown();
        }
    }

    image-20210624173756975

    2. CountDownLatch:倒计时器

      • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

      • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

        主要方法如下:

        image-20210624164344461

        最常见的场景就是模拟火箭发射,在火箭发射前需要对各个设备、仪器的检查,只有各个检查都没有问题以后才能发射

        public class CountDownLacthDemo implements Runnable{
        ​
            static CountDownLatch downLatch = new CountDownLatch(10);
        ​
            @Override
            public void run() {
                //模拟检测任务
                try {
                    Thread.sleep(new Random().nextInt(10)*1000);
                    System.out.println(Thread.currentThread().getName() + "检查完毕");
        ​
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    downLatch.countDown();
                }
            }
        ​
            public static void main(String[] args) throws InterruptedException {
                ExecutorService exec = Executors.newFixedThreadPool(10);
        ​
                CountDownLacthDemo demo = new CountDownLacthDemo();
        ​
                for (int i = 0; i < 10; i++) {
                    exec.submit(demo);
                }
                //等到检查
                downLatch.await();
                System.out.println("发射");
                exec.shutdown();//关闭线程池
        ​
            }
        }

    3.CyclicBarrier: 循环栅栏

    CyclicBarrier 字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。

    image-20210625140957795

    1.主要方法
    //设置parties、count及barrierCommand属性。   
    CyclicBarrier(int):   
      
    //当await的数量到达了设定的数量后,首先执行该Runnable对象。   
    CyclicBarrier(int,Runnable):   
      
    //通知barrier已完成线程   
    await():
    2. 使用场景
      1. 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。

    public class CyclicBarrierDemo {
    ​
        /**
         * 场景:
         * 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。
         */
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
                System.out.println("***********下面进行面试测试**************");
    ​
            });
            CyclicBarrier interviewCycle = new CyclicBarrier(10,()->{
                System.out.println("*********谢谢大家参加公司的招聘工作*********");
            });
    ​
            for (int i = 1; i <= 10; i++) {
                new Thread(()->{
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + "笔试结束");
                        cyclicBarrier.await();
    ​
                        TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                        System.out.println(Thread.currentThread().getName() + "面试结束");
                        interviewCycle.await();
    ​
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }, "工程师"+String.valueOf(i)).start();
            }
            System.out.println("***********下面进行笔试测试**************");
        }
    }

    image-20210625145633103

    以上代码使用了2个CyclicBarrier来表笔试和面试的同步点,只有所有人的笔试完成(cyclicBarrier这是一个同步点)后才能进行面试工作,所有人的面试完成(interviewCycle这是一个同步点点)后才能宣布结束。同时使用构造函数CyclicBarrier(int parties,Runnable barrierAction)

    image-20210625144401754

    该构造函数表示线程等待的数量等于parties时,才会执行barrierAction这个方法。以上例子,当调用10次cyclicBarrier.awati()后,即第10个线程的打印出“笔试结束”这一句后就会执行cyclicBarrier.barrierAction方法里面的打印语句“下面进行面试测试”,同理调用10次interviewCycle.await()方法后,第10个线程打印出“面试结束”这一句后就会执行interviewCycle.barrierAction方法里面的打印语句“谢谢大家参加公司的招聘工作”。

    image-20210625145834329

    以上例子创建了2各不同的CyclicBarrier对象来实现,不能体现出CyclicBarrier可循环重复使用的性质,是否可以只创建一个cyclicBarrier对象就可以完成以上实例呢?

    public class CyclicBarrierDemo3 {
    ​
        /**
         * 场景:
         * 10个工程师一起来公司应聘,招聘方式分为笔试和面试,首先要等人到齐后,开始笔试;笔试结束后,再一起参加面试。
         */
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
    ​
            for (int i = 1; i <= 10; i++) {
                new Thread(()->{
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + "笔试结束");
                        cyclicBarrier.await();
    ​
                        TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                        System.out.println(Thread.currentThread().getName() + "面试结束");
                        cyclicBarrier.await();
    ​
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }, "工程师"+String.valueOf(i)).start();
            }
            System.out.println("***********下面进行笔试测试**************");
            cyclicBarrier.await();
            System.out.println("***********下面进行面试测试**************");
            cyclicBarrier.await();
            System.out.println("***********谢谢大家参加公司的招聘工作**************");
        }
    }

    image-20210625145558073

    特别注意的 CyclicBarrier cyclicBarrier = new CyclicBarrier(11);为什么是11而不是10呢?因为main主线程也有调用cyclicBarrier.await();如果是10的话会造成程序一直处于运行状态,CyclicBarrier一直处于阻塞状态。

      1. CountDownLatch和CyclicBarrier综合应用

        场景:模拟10个考生参加考试的过程,具体流程如下

        • 1.首先考生在考场外等待考试开始的铃声响起

        • 2.考场main考试开始铃声响起

        • 3.考生听到铃声进入考场开始答题

        • 4.考生考试结束交卷

        • 5.等待全部考生交卷后考场考试结束

    public class CyclicBarrierDemo2 {
        
        public static void main(String[] args) {
    ​
            CyclicBarrier studentCyclicBarrier = new CyclicBarrier(11);
    ​
            CountDownLatch examCoundownLatch = new CountDownLatch(1);
    ​
            for (int i = 1; i <= 10; i++) {
                new Thread(()->{
                    try {
    ​
                        System.out.println(Thread.currentThread().getName() + "在等待考试开始的铃声响起");
                        examCoundownLatch.await();//等待考场主线程执行完毕
    ​
                        System.out.println("考生听到铃声" + Thread.currentThread().getName() + "开始答题");
                        TimeUnit.SECONDS.sleep(new Random().nextInt(10));
    ​
                        System.out.println("考生" +Thread.currentThread().getName()+ "答题完毕,开始交卷");
                        studentCyclicBarrier.await();
    ​
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                },"考生" + String.valueOf(i)).start();
            }
    ​
            //main-->考场主线程
            try {
    ​
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("考场" + Thread.currentThread().getName() +"铃声即将响起");
                    System.out.println("考场" + Thread.currentThread().getName() + "铃声响起");
                } finally {
                    examCoundownLatch.countDown();
                }
                studentCyclicBarrier.await();
    ​
                System.out.println("考生都已经交卷,"+Thread.currentThread().getName()+"考试结束");
    ​
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    image-20210625150915665

    4.线程池

    为了避免系统频繁的创建和销毁线程,可以让创建的线程进行复用。简而言之,在使用线程池后,创建线程变成了从线程池获取空闲线程,关闭线程变成了向线程池归还线程。线程池主要掌握3大方法、7大参数、4种拒绝策略

    image-20210625174757806

    1. 7大参数

    image-20210625174648714

    在这7个参数中,大多数都很简单,只有workQueue和handler需要进行详细的说明

      • workQueue: 是指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于Runnable对象。

        • SynchronousQueue: 直接提交队列

          image-20210625235603023

          SynchronousQueue没有容量,提交的任务不会被真实的保存,而总是将新任务提交给线程执行。如果没有空闲的进程(即线程数大于corePoolSize),则尝试创建新的线程,如果线程数量已经大于最大值maxmumPoolSize,则执行拒绝策略。

          线程数<= corePoolSize

          image-20210625223149385 线程数 <= maxPoolSize

          image-20210625223650002 线程数 > maxPoolSize

          image-20210625223835956

        • ArrayBlockingQueue: 有边界阻塞队列

          ArrayBlockingQueue的构造函数必须带一个参数,表示该队列的最大容量。

          image-20210625224246744

          线程数小于等于corePoolSize时,优先创建新的线程

          corePoolSize < 线程数 时,会将线程放进队列中

          如果队列已满无法,无法加入,而且线程数 <= maxPoolSize时,创建新的线程

          线程数 > maxPoolSize + capacity(队列容量), 则执行拒绝策略

          image-20210625230039963 image-20210625230112357

        • LinkedBlockingQueue: 无边界阻塞队列

          与ArryBlockingQueue队列相比,除非系统资源耗尽,否则无界队列不存在任务入队失败的情况。

          image-20210625234945271

          如果不指定阻塞队列的容量的前提下,默认容量为Integer.MAX_VALUE.

          • 线程数 <= corePoolSize时,线程池生成新的线程执行任务

          • 线程数 > corePoolSize时,任务直接进入队列,如果任务创建速度 > 处理的速度,无界队列会快速增长,直到资源耗尽

          image-20210625231047546

          如果指定了阻塞队列的长度,线程数 > maxPoolSize + capacity的话,会执行拒绝策略

          image-20210625235241666

        • PriorityBlockingQueue: 优先任务队列

          它是一个特殊的无界队列,可以根据任务自身的优先级顺序先后执行。ArrayBlockingQueue和LinkedBlockingQueue队列都是按照先进先出算法处理任务的。无参构造的话初始容量为11

          image-20210625232650445

        注意报错

        java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
            at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
            at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
            at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
            at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
            at com.sean.thread.pool.TestPool.main(TestPool.java:27)
    2. 3大方法

    《Java开发手册》中明确提出不建议使用Exectors去创建线程池,而是通过ThreadPoolExecutor去创建

    image-20210628101937962

    image-20210628101412410

    image-20210628101509867

    image-20210628101553306

    image-20210628102117874

    3. 4种拒绝策略

    JDK内置提供了四种拒绝策略:

      • AbortPolicy: 该策略会直接抛出异常,阻止系统正常工作

      • CallerRunsPolicy: 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

      • DiscardOledestPolicy: 该策略丢弃最老的一个请求,也就是将被执行的一个任务,并尝试再次提交当前任务

      • DiscardPolicy:该策略默默的丢弃无法处理的任务,不予处理。

    4. 自定义创建线程:ThreadFactory

    线程池的主要作用是为了线程复用,也就避免了线程的频繁创建。但是最开始的那些线程从何而来?答案就是ThreadFactory.

    ThreadFactory是一个接口,里面就只有一个方法,用来创建线程

    image-20210628140810725

    自定义线程池可以自定义线程的名称、组以及优先级等信息

    /**
     * 自定义线程工厂
     */
    class NamedThreadFactory implements ThreadFactory{
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        NamedThreadFactory(String name) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            if (null == name || name.isEmpty()) {
                name = "pool";
            }
            namePrefix = name + "-窗口-";
        }
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            return thread;
        }
    
    }

    线程池的execute()和submit()方法的区别 (https://www.jianshu.com/p/6bcb61fd801b)

      • 接收参数不一样

      • submit有返回值,返回一个Future对象,而execute没有返回值

      • submit方便Exception处理

    public class RunnableTestMain {
    
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(2);
            
            /**
             * execute(Runnable x) 没有返回值。可以执行任务,但无法判断任务是否成功完成。
             */
            pool.execute(new RunnableTest("Task1")); 
            
            /**
             * submit(Runnable x) 返回一个future。可以用这个future来判断任务是否成功完成。请看下面:
             */
            Future future = pool.submit(new RunnableTest("Task2"));
            
            try {
                if(future.get()==null){//如果Future's get返回null,任务完成
                    System.out.println("任务完成");
                }
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
                //否则我们可以看看任务失败的原因是什么
                System.out.println(e.getCause().getMessage());
            }
    
        }
    
    }
    
     class RunnableTest implements Runnable {
        
        private String taskName;
        
        public RunnableTest(final String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            System.out.println("Inside "+taskName);
            throw new RuntimeException("RuntimeException from inside " + taskName);
        }
    
    }

     

     

     

  • 相关阅读:
    Python与数据库
    初识matplotlib
    Jquery--实现轮播图
    Juery入门2
    CSS布局方式
    Jquery入门一
    html-DOM了解
    jquery --入门
    JS练习
    kettle 报错汇总
  • 原文地址:https://www.cnblogs.com/seanRay/p/14944707.html
Copyright © 2011-2022 走看看