zoukankan      html  css  js  c++  java
  • java_guide_9-30_并发相关

    3.1 CopyOnWriteArrayList 简介

    public class CopyOnWriteArrayList<E>
    extends Object
    implements List<E>, RandomAccess, Cloneable, Serializable

    在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问List的内部数据,毕竟读取操作是安全的。

    这和我们之前在多线程章节讲过 ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK中提供了 CopyOnWriteArrayList 类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?

    3.2 CopyOnWriteArrayList 是如何做到的?

    CopyOnWriteArrayList 类的所有可变操作(add,set等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

    从 CopyOnWriteArrayList 的名字就能看出CopyOnWriteArrayList 是满足CopyOnWrite 的ArrayList,所谓CopyOnWrite 也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。

    六 ConcurrentSkipListMap

    为了引出ConcurrentSkipListMap,先带着大家简单理解一下跳表。

    对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

    跳表的本质是同时维护了多个链表,并且链表是分层的,

    最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。

    跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素18。

    查找18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。

    从上面很容易看出,跳表是一种利用空间换时间的算法。

    使用跳表实现Map 和使用哈希算法实现Map的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是ConcurrentSkipListMap。


    1 AQS 简单介绍

    AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。

    AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

    2.1 AQS 原理概览

    AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

    CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。


    3 Semaphore(信号量)-允许多个线程同时访问

    synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。 示例代码如下:

    /**
     * 
     * @author Snailclimb
     * @date 2018年9月30日
     * @Description: 需要一次性拿一个许可的情况
     */
    public class SemaphoreExample1 {
      // 请求的数量
      private static final int threadCount = 550;
    
      public static void main(String[] args) throws InterruptedException {
        // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        // 一次只能允许执行的线程数量。
        final Semaphore semaphore = new Semaphore(20);
    
        for (int i = 0; i < threadCount; i++) {
          final int threadnum = i;
          threadPool.execute(() -> {// Lambda 表达式的运用
            try {
              semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
              test(threadnum);
              semaphore.release();// 释放一个许可
            } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            }
    
          });
        }
        threadPool.shutdown();
        System.out.println("finish");
      }
    
      public static void test(int threadnum) throws InterruptedException {
        Thread.sleep(1000);// 模拟请求的耗时操作
        System.out.println("threadnum:" + threadnum);
        Thread.sleep(1000);// 模拟请求的耗时操作
      }
    }
    View Code

    除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回false。

    4 CountDownLatch (倒计时器)

    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

    4.1 CountDownLatch 的三种典型用法

    ①某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

    ②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

    ③死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

    /**
     * 
     * @author SnailClimb
     * @date 2018年10月1日
     * @Description: CountDownLatch 使用方法示例
     */
    public class CountDownLatchExample1 {
      // 请求的数量
      private static final int threadCount = 550;
    
      public static void main(String[] args) throws InterruptedException {
        // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
          final int threadnum = i;
          threadPool.execute(() -> {// Lambda 表达式的运用
            try {
              test(threadnum);
            } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            } finally {
              countDownLatch.countDown();// 表示一个请求已经被完成
            }
    
          });
        }
        countDownLatch.await();
        threadPool.shutdown();
        System.out.println("finish");
      }
    
      public static void test(int threadnum) throws InterruptedException {
        Thread.sleep(1000);// 模拟请求的耗时操作
        System.out.println("threadnum:" + threadnum);
        Thread.sleep(1000);// 模拟请求的耗时操作
      }
    }

    上面的代码中,我们定义了请求的数量为550,当这550个请求被处理完成之后,才会执行System.out.println("finish");

    与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

    其他N个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。


    5 CyclicBarrier(循环栅栏)

    CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

    5.1 CyclicBarrier 的应用场景

    CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

    /**
     * 
     * @author Snailclimb
     * @date 2018年10月1日
     * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
     */
    public class CyclicBarrierExample2 {
      // 请求的数量
      private static final int threadCount = 550;
      // 需要同步的线程数量
      private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    
      public static void main(String[] args) throws InterruptedException {
        // 创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
    
        for (int i = 0; i < threadCount; i++) {
          final int threadNum = i;
          Thread.sleep(1000);
          threadPool.execute(() -> {
            try {
              test(threadNum);
            } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            } catch (BrokenBarrierException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            }
          });
        }
        threadPool.shutdown();
      }
    
      public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
        System.out.println("threadnum:" + threadnum + "is ready");
        try {
          /**等待60秒,保证子线程完全执行结束*/  
          cyclicBarrier.await(60, TimeUnit.SECONDS);
        } catch (Exception e) {
          System.out.println("-----CyclicBarrierException------");
        }
        System.out.println("threadnum:" + threadnum + "is finish");
      }
    
    }
    View Code
    threadnum:0is ready
    threadnum:1is ready
    threadnum:2is ready
    threadnum:3is ready
    threadnum:4is ready
    threadnum:4is finish
    threadnum:0is finish
    threadnum:1is finish
    threadnum:2is finish
    threadnum:3is finish
    threadnum:5is ready
    threadnum:6is ready
    threadnum:7is ready
    threadnum:8is ready
    threadnum:9is ready
    threadnum:9is finish
    threadnum:5is finish
    threadnum:8is finish
    threadnum:7is finish
    threadnum:6is finish
    ......

    可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await方法之后的方法才被执行。

    另外,CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。示例代码如下:

    /**
     * 
     * @author SnailClimb
     * @date 2018年10月1日
     * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
     */
    public class CyclicBarrierExample3 {
      // 请求的数量
      private static final int threadCount = 550;
      // 需要同步的线程数量
      private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
        System.out.println("------当线程数达到之后,优先执行------");
      });
    
      public static void main(String[] args) throws InterruptedException {
        // 创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
    
        for (int i = 0; i < threadCount; i++) {
          final int threadNum = i;
          Thread.sleep(1000);
          threadPool.execute(() -> {
            try {
              test(threadNum);
            } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            } catch (BrokenBarrierException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            }
          });
        }
        threadPool.shutdown();
      }
    
      public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
        System.out.println("threadnum:" + threadnum + "is ready");
        cyclicBarrier.await();
        System.out.println("threadnum:" + threadnum + "is finish");
      }
    
    }
    View Code
    threadnum:0is ready
    threadnum:1is ready
    threadnum:2is ready
    threadnum:3is ready
    threadnum:4is ready
    ------当线程数达到之后,优先执行------
    threadnum:4is finish
    threadnum:0is finish
    threadnum:2is finish
    threadnum:1is finish
    threadnum:3is finish
    threadnum:5is ready
    threadnum:6is ready
    threadnum:7is ready
    threadnum:8is ready
    threadnum:9is ready
    ------当线程数达到之后,优先执行------
    threadnum:9is finish
    threadnum:5is finish
    threadnum:6is finish
    threadnum:8is finish
    threadnum:7is finish
    ......

    CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

  • 相关阅读:
    1334: 好老师
    poj 2255 Tree Recovery
    2006浙大:简单计算器
    POJ1001(C++处理大数)
    HDU2159(二维完全背包)
    POJ2080:Calendar(计算日期)
    2008上交:Day of Week
    POJ1365:质因数分解
    VIJOS:P1706(舞会)
    POJ2449:K短路
  • 原文地址:https://www.cnblogs.com/ustc-anmin/p/11613626.html
Copyright © 2011-2022 走看看