zoukankan      html  css  js  c++  java
  • 13-JUC(下)

    1. 同步器

    1.1 CountDownLatch

    • 当一个或多个线程调用 await() 时,这些线程会阻塞;
    • 其它线程调用 countDown() 会将计数器减 1(调用该方法的线程不会阻塞);
    • 当计数器的值变为 0 时(减少计数),因 await() 阻塞的线程会被唤醒,继续执行。
    // 案例:秦灭六国
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            // [减少计数] 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒
            CountDownLatch cd = new CountDownLatch(6);
    
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName() + "被灭了 ...");
                    cd.countDown();
                }, String.valueOf(i)).start();
            }
    
            cd.await(); // 等待计数器归零,然后再向下执行。
            System.out.println(Thread.currentThread().getName() + "一统天下 ...");
        }
    }
    

    1.2 CyclicBarrier

    CyclicBarrier 的字面意思是可循环(Cyclic) 使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫“同步点”)时被阻塞(线程进入屏障是通过 await() 方法),直到最后一个线程到达屏障时,屏障才会开门。只有等屏障开了,所有被屏障拦截的线程才会继续干活。

    示例代码:

    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
                System.out.println("七龙珠集齐!召唤神龙!");
            });
    
            for (int i = 1; i <= 7; i++) {
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"星龙珠被收集");
                    try {
                        cyclicBarrier.await(); // 线程阻塞
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"星龙珠合成...");
                }, String.valueOf(i)).start();
            }
        }
    }
    

    打印结果:

    2星龙珠被收集
    5星龙珠被收集
    4星龙珠被收集
    1星龙珠被收集
    3星龙珠被收集
    7星龙珠被收集
    6星龙珠被收集
    七龙珠集齐!召唤神龙!
    6星龙珠合成...
    4星龙珠合成...
    5星龙珠合成...
    2星龙珠合成...
    7星龙珠合成...
    3星龙珠合成...
    1星龙珠合成...
    

    1.3 Semaphore

    信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。在信号量上我们定义 2 种操作:

    • acquire(获取):当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减 1),要么一直等下去,直到有线程释放信号量或超时。
    • release(释放):实际上会将信号量的值加 1,然后唤醒等待的线程。

    示例代码:

    public class SemaphoreDemo {
        public static void main(String[] args) {
            // 3 个停车位
            Semaphore sp = new Semaphore(3);
            // 6 辆汽车
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    try {
                        sp.acquire();
                        System.out.println(Thread.currentThread().getName() + "号车驶入停车位");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + "号车驶出停车位");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        sp.release();
                    }
                }, String.valueOf(i)).start();
            }
        }
    }
    

    打印结果:

    1号车驶入停车位
    3号车驶入停车位
    2号车驶入停车位
    1号车驶出停车位
    3号车驶出停车位
    4号车驶入停车位
    2号车驶出停车位
    6号车驶入停车位
    5号车驶入停车位
    6号车驶出停车位
    5号车驶出停车位
    4号车驶出停车位
    

    2. 读写锁

    class MyCache {
        private volatile Map<String, Object> map = new HashMap<>();
        
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        
        public void put(String key, Object value) {
            lock.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "开始写入 ...");
                map.put(key, value);
                System.out.println(Thread.currentThread().getName() + "写入完毕 ...");
            } finally {
                lock.writeLock().unlock();
            }
        }
        
        public Object get(String key) {
            lock.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "开始读入 ...");
                Object value = map.get(key);
                System.out.println(Thread.currentThread().getName() + "读入完毕 ...");
                return value;
            } finally {
                lock.readLock().unlock();
            }
            
        }
    }
    

    3. 阻塞队列

    3.1 概念

    • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。
    • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。

    3.2 用处

    在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。

    为什么需要 BlockingQueue?

    好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。

    在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

    3.3 架构&种类

    • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
    • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列
    • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
    • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
    • SynchronousQueue:同步队列,不存储元素的阻塞队列,也即单个元素的队列
    • LinkedTransferQueue:由链表组成的无界阻塞队列
    • LinkedBlockingDeque:由链表组成的双向阻塞队列

    3.4 示例代码

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class BlockQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            // 1. 抛出异常
            // throwExTest();
            // 2. 特殊值
            // retBoolTest();
            // 3. 阻塞
            // blockingTest();
            // 4. 超时退出
            timeoutTest();
        }
    
        private static void timeoutTest() throws InterruptedException {
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
            System.out.println(queue.offer("a")); // true
            System.out.println(queue.offer("b")); // true
            System.out.println(queue.offer("c")); // true
            System.out.println(queue.offer("d", 3, TimeUnit.SECONDS)); // false
            System.out.println(queue.poll()); // a
            System.out.println(queue.poll()); // b
            System.out.println(queue.poll()); // c
            System.out.println(queue.poll(4, TimeUnit.SECONDS)); // null
        }
    
        private static void blockingTest() throws InterruptedException {
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
            queue.put("a");
            queue.put("b");
            queue.put("c");
            // queue.put("d"); -> blocking ...
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
            // System.out.println(queue.take()); -> blocking ...
        }
    
    
        private static void retBoolTest() {
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
            System.out.println(queue.offer("a")); // true
            System.out.println(queue.offer("b")); // true
            System.out.println(queue.offer("c")); // true
            System.out.println(queue.peek()); // a
            System.out.println(queue.offer("d")); // false
            System.out.println(queue.poll()); // a
            System.out.println(queue.poll()); // b
            System.out.println(queue.poll()); // c
            System.out.println(queue.poll()); // null
        }
    
        private static void throwExTest() {
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
            // 1. 添加
            System.out.println(queue.add("a")); // true
            System.out.println(queue.add("b")); // true
            System.out.println(queue.add("c")); // true
            System.out.println(queue.element()); // a
            // System.out.println(queue.add("d")); -> IllegalStateException: Queue full
    
            // 2. 移除
            System.out.println(queue.remove());
            System.out.println(queue.remove());
            System.out.println(queue.remove());
            // System.out.println(queue.remove()); -> NoSuchElementException
        }
    }
    

    4. AQS

    5. 线程池

    5.1 引入

    10 年前单核 CPU 电脑,假的多线程,CPU 需要来回切换;现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。

    线程池的优势:线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果任务数量超过了线程最大数量,超出数量的任务排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

    线程池的主要特点:线程复用;控制最大并发数;管理线程

    1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
    2. 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
    3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

    5.2 使用

    5.2.1 架构说明

    Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类。

    5.2.2 有关 API

    • Executors.newSingleThreadExecutor():一个任务一个任务的执行,一池一线程。
    • Executors.newFixedThreadPool(N):执行长期任务性能好,创建一个有 N 个固定的线程的线程池。
    • Executors.newCachedThreadPool():执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。

    示例代码:

    public class ThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(3); // 固定个数
            ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 单个
            ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 可扩展
            for (int i = 1; i <= 500; i++) {
                threadPool3.execute(()->{
                    System.out.println(Thread.currentThread().getName() + "处理任务");
                });
            }
            threadPool.shutdown();
            threadPool2.shutdown();
            threadPool3.shutdown();
        }
    }
    

    5.2.3 底层源码

    ThreadPoolExecutor

    public ThreadPoolExecutor(
                int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    
        if (corePoolSize < 0 || maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
    
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    1. corePoolSize:线程池中的常驻核心线程数(惰性加载)
    2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须 >= 1。
    3. keepAliveTime:多余的空闲线程的存活时间。当池中线程数量超过 corePoolSize 并且线程空闲时间达到 keepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止。
    4. unit:keepAliveTime 的单位
    5. workQueue:任务队列,被提交但尚未被执行的任务。
    6. threadFactory:表示生成线程池中工作线程的线程工厂。用于创建线程,一般默认的即可。
    7. handler:拒绝策略。表示当队列满了,并且工作线程大于等于线程池的 maximumPoolSize 时如何来拒绝请求执行的 Runnable 的策略。

    在工作中单一的/固定数的/可变的 3 种创建线程池的方法哪个用的多?注意,哪个都不用!

    5.3 工作原理

    1. 在创建了线程池后,线程池中的线程数为 0。

    2. 当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:

    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    • 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个新来的任务
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

    3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

    4. 当一个线程无事可做超过一定的时间(keepAliveTime) 时,线程会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以,当线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

    5.4 拒绝策略

    等待队列已经排满了,再也塞不下新任务了。同时,线程池中的 max 线程也达到了,无法继续为新任务服务。这个时候我们就需要拒绝策略机制合理的处理这个问题。

    示例代码:20 个任务

    public class ThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService threadPool = new ThreadPoolExecutor(
                    2,
                    5,
                    3L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            try {
                for (int i = 0; i < 20; i++) {
                    threadPool.execute(() -> {
                        System.out.println(Thread.currentThread().getName());
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown();
            }
        }
    }
    

    JDK内置的拒绝策略(以下均实现了 RejectedExecutionHandle<I>

    • AbortPolicy(默认):直接抛出 RejectedExecutionException 阻止系统正常运行。
      pool-1-thread-1
      pool-1-thread-4
      pool-1-thread-1
      pool-1-thread-1
      pool-1-thread-3
      pool-1-thread-2
      pool-1-thread-4
      pool-1-thread-5
      java.util.concurrent.RejectedExecutionException: Task cn.edu.nuist.threadpool
          .ThreadPoolTest$$Lambda$1/821270929@85ede7b rejected from java.util.concurrent
          .ThreadPoolExecutor@5674cd4d
          [Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
      
    • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
      pool-1-thread-1
      pool-1-thread-3
      pool-1-thread-2
      main <- 将某些任务回退到调用者
      pool-1-thread-3
      pool-1-thread-1
      pool-1-thread-3
      pool-1-thread-5
      main <- 将某些任务回退到调用者
      pool-1-thread-4
      pool-1-thread-2
      pool-1-thread-3
      pool-1-thread-1
      pool-1-thread-2
      pool-1-thread-4
      pool-1-thread-1
      pool-1-thread-2
      pool-1-thread-3
      pool-1-thread-1
      pool-1-thread-4
      
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
      pool-1-thread-1
      pool-1-thread-4
      pool-1-thread-3
      pool-1-thread-3
      pool-1-thread-2
      pool-1-thread-1
      pool-1-thread-4
      pool-1-thread-5
      
    • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
      pool-1-thread-1
      pool-1-thread-3
      pool-1-thread-2
      pool-1-thread-1
      pool-1-thread-4
      pool-1-thread-4
      pool-1-thread-4
      pool-1-thread-4
      pool-1-thread-5
      pool-1-thread-4
      pool-1-thread-3
      pool-1-thread-4
      pool-1-thread-5
      pool-1-thread-3
      

    5.5 如何设计线程池

    5.5.1 回答思路

    摘自公众号:yes的练级攻略

    这种设计类问题还是一样,先说下理解,表明你是知道这个东西的用处和原理的,然后开始 BB。基本上就是按照现有的设计来说,再添加一些个人见解。

    线程池讲白了就是存储线程的一个容器,池内保存之前建立过的线程来重复执行任务,减少创建和销毁线程的开销,提高任务的响应速度,并便于线程的管理。

    我个人觉得如果要设计一个线程池的话得考虑池内工作线程的管理、任务编排执行、线程池超负荷处理方案、监控等方面。

    要将初始化线程数、核心线程数、最大线程池都暴露出来可配置,包括超过核心线程数的线程空闲消亡相关配置。

    然后任务的存储结构也得可配置,可以是无界队列也可以是有界队列,也可以根据配置,分多个队列来分配不同优先级的任务,也可以采用 stealing 的机制来提高线程的利用率。

    再提供配置来表明此线程池是 IO 密集型还是 CPU 密集型来改变任务的执行策略。

    超负荷的方案可以有多种,包括丢弃任务、拒绝任务并抛出异常、丢弃最旧的任务或自定义等等。

    至于监控的话,线程池设计要埋好点,暴露出用于监控的接口,如已处理任务数、待处理任务数、正在运行的线程数、拒绝的任务数等等信息。

    我觉得基本上这样答就差不多了,等着面试官的追问就好。注意不需要跟面试官解释什么叫核心线程数之类的,都懂的没必要。

    当然这种开放型问题还是仁者见仁智者见智,我这个不是标准答案,仅供参考。建议把线程池相关的关键字都要说出来,表面你对线程池的内部原理的理解是透彻的。

    5.5.2 IO/CPU 密集型

    https://blog.csdn.net/youanyyou/article/details/78990156

    • CPU 密集型(CPU-bound)
      • CPU 密集型也叫计算密集型,指的是系统的硬盘、内存性能相对 CPU 要好很多,此时,系统运作大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O(硬盘/内存),I/O 在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。
      • 在多重程序系统中,大部份时间用来做计算、逻辑判断等 CPU 动作的程序称之 CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于 CPU bound 的程序。CPU bound 的程序一般而言 CPU 占用率相当高。这可能是因为任务本身不太需要访问 I/O 设备,也可能是因为程序是多线程实现因此屏蔽掉了等待 I/O 的时间。
    • IO 密集型(I/O bound)
      • IO 密集型指的是系统的 CPU 性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写操作,此时 CPU Loading 并不高。
      • I/O bound 的程序一般在达到性能极限时,CPU 占用率仍然较低。这可能是因为任务本身需要大量 I/O 操作,而 pipeline 做得不是很好,没有充分利用处理器能力。

    CPU密集型 vs IO密集型

    我们可以把任务分为计算密集型和 IO 密集型。

    计算密集型任务的特点是要进行大量的计算,消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等等,全靠 CPU 的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以,要最高效地利用 CPU,计算密集型任务同时进行的数量应当等于 CPU 的核心数。计算密集型任务由于主要消耗 CPU 资源,因此,代码运行效率至关重要。Python 这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用 C 语言编写。

    涉及到网络、磁盘 IO 的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。对于 IO 密集型任务,任务越多,CPU 效率越高,但也有一个限度。常见的大部分任务都是 IO 密集型任务,比如 Web 应用。IO 密集型任务执行期间,99% 的时间都花在 IO 上,花在 CPU 上的时间很少,因此,用运行速度极快的 C 语言替换用 Python 这样运行速度极低的脚本语言,完全无法提升运行效率。对于 IO 密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C 语言最差。

    总之,计算密集型程序适合 C 语言多线程,I/O 密集型适合脚本语言开发的多线程。

    6. 分支合并框架

    Fork:把一个复杂任务进行分拆,大事化小;Join:把分拆任务的结果进行合并。

    • ForkJoinPool:分支合并池
    • ForkJoinTask:类比“FutureTask”
    • RecursiveTask:递归任务,继承后可以实现递归调用的任务

    public class ForkJoinDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            MyTask myTask = new MyTask(0, 100);
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
            System.out.println(forkJoinTask.get());
        }
    }
    
    class MyTask extends RecursiveTask<Integer> {
        public static final int CALCULATE_RANGE = 10;
        private int begin;
        private  int end;
        private int result;
    
        public MyTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            if ((end - begin) <= CALCULATE_RANGE) {
                for (int i = begin; i <= end; i++) {
                    result += i;
                }
            } else {
                int mid = (begin + end) >> 1;
                MyTask task1 = new MyTask(begin, mid);
                MyTask task2 = new MyTask(mid+1, end);
                task1.fork();
                task2.fork();
                result = task1.join() + task2.join();
            }
            return result;
        }
    }
    

    7. 异步回调

    public class CompletableFutureDemo {
    
        public static void main(String[] args) throws Exception {
    
            CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
                System.out.println(Thread.currentThread().getName()+"	 runAsync");
            });
            completableFuture1.get(); // 阻塞直至获取结果
    
            CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread().getName()+"	 supplyAsync");
                int i = 10/0;
                return 1024;
            });
    
            // 异步回调
            completableFuture2.whenComplete((correctRet, exceptionInfo) -> {
                System.out.println("correctRet=" + correctRet);
                System.out.println("exceptionInfo=" + exceptionInfo);
            }).exceptionally(e -> {
                System.out.println("exception:" + e.getMessage());
                return 1101;
            }).get();
        }
    }
    
  • 相关阅读:
    JS运动---运动基础(匀速运动)
    浅谈浏览器解析 URL+DNS 域名解析+TCP 三次握手与四次挥手+浏览器渲染页面
    浅谈JS重绘与回流
    浅谈JS函数节流及应用场景
    浅谈JS函数防抖及应用场景
    前端模块化(CommonJs,AMD和CMD)
    Git之SSH公钥与私钥
    vi/vim编辑器必知必会
    git笔录
    vue移动端弹框组件,vue-layer-mobile
  • 原文地址:https://www.cnblogs.com/liujiaqi1101/p/14793222.html
Copyright © 2011-2022 走看看