zoukankan      html  css  js  c++  java
  • Java高并发程序设计(五)—— JDK并发包1

    一、各种同步控制工具的使用

    (一)ReentrantLock

    1. 概述

    ReentrantLock(重入锁)是synchronized关键字的替代品,或者说是增强版。synchronized关键字特点是使用简单,但是功能薄弱,因为它只能做到多个线程同时想进入临界区是,让不能进入临界区的线程做一个等待,这个等待是一个死等,只有当前面的线程离开临界区之后,它才能竞争进入。但是ReentrantLock提供了更多的选择性,在JDK1.5之前,由于JVM对synchronized关键字优化并不够充分,导致ReentrantLock重入锁性能要好于synchronized关键字,但是在做了充分的优化之后,在现在的JDK版本之中,两者性能不相上下。所以如果只是一个简单的功能实现,没有必要去追求比较复杂或者高级的ReentrantLock。ReentrantLock与synchronized相比,除了实现了普通的锁的功能之外,它还实现了可重入、可中断、可限时、公平锁这些特点。

    2. 可重入

    单线程可以重复进入,但要重复退出。

    就是ReentrantLock对于同一个线程来讲,它必须是可重入的,否则会出现一个线程把自己给卡死的状态。

    代码示例:

    ReentrantLock1:

     1 import java.util.concurrent.locks.ReentrantLock;
     2 
     3 public class ReentrantLock1 implements Runnable{
     4     public static ReentrantLock lock = new ReentrantLock();
     5     public static int i = 0;
     6     
     7     @Override
     8     public void run() {
     9         for(int j = 0; j<10000; j++) {
    10             lock.lock();
    11             System.out.println(Thread.currentThread().getName());
    12             try {
    13                 i++;
    14             }finally {
    15                 lock.unlock();
    16             }
    17         }
    18     }
    19 
    20     public static void main(String[] args) throws InterruptedException {
    21         ReentrantLock1 l1 = new ReentrantLock1();
    22         Thread t1 = new Thread(l1,"t1");
    23         Thread t2 = new Thread(l1,"t2");
    24         t1.start();
    25         t2.start();
    26         t1.join();
    27         t2.join();
    28         System.out.println(i);
    29     }
    30 }

    ReentrantLock2:

     1 import java.util.concurrent.locks.ReentrantLock;
     2 
     3 public class ReentrantLock2 implements Runnable{
     4     private static ReentrantLock lock = new ReentrantLock();
     5     private static int i = 0;
     6     @Override
     7     public void run() {
     8         for(int j = 0; j < 10000; j++) {
     9             lock.lock();
    10             lock.lock();
    11             try {
    12                 i++;
    13             } finally {
    14                 lock.unlock();
    15                 lock.unlock();
    16             }
    17         }
    18     }
    19     
    20     public static void main(String[] args) throws InterruptedException {
    21         ReentrantLock2 l2 = new ReentrantLock2();
    22         Thread t1 = new Thread(l2);
    23         Thread t2 = new Thread(l2);
    24         t1.start();
    25         t2.start();
    26         t1.join();
    27         t2.join();
    28         System.out.println(i);
    29     }
    30 
    31 }

    在ReenterLock类中,有一个静态变量i,在多个线程中要对i做++操作,直接对变量i做++操作,一定不是线程安全的,所以我们加锁,锁就是ReentrantLock,我们现在有两个线程t1和t2,t1和t2都会对它做一个++操作,我们每次只允许一个线程对它做++,每次在++之前,在lock锁上做一个lock()操作,离开之后要做一个unlock()操作,这种写法是使用ReentrantLock的基本范式,把unlock写在finally里面,是为了防止你发生了某些意外,导致这个锁没有释放掉,如果这个锁没有释放掉,后果是很严重的,会导致其它的线程都没有办法进来,万一说执行过程中抛出了一个异常,但是异常又没有处理,那么后果就非常严重。finally是一个万无一失的做法,无论程序怎样退出,都会被执行。所以在finally里面把锁释放掉。

    相对于synchronized来讲,ReentrantLock显得有些复杂。synchronized这个锁的释放是由虚拟机完成的一个自动的动作,我们只要把synchronized大括号括起来就可以了,所以只要语法上检查是通过的,synchronized会把锁释放掉。但是ReentrantLock控制在什么什么时间释放锁,从这点来讲,提供了锁释放的灵活性,你可以在任何场景下去释放,但是付出的代价是要额外小心,不能忘记把锁释放掉,如果忘了释放,其它线程就永远进不来了。

    重入锁:如果在一个线程中有可能两次对这个锁进行加锁,如果出现这种情况,必须对这个锁释放两次。如果少释放了一次,会出现线程等待的现象。重入锁里面记录的是你拿了这把锁多少个许可,就必须释放相同次数,否则导致其它线程拿不到这个许可。

    3. 可中断

    lockInterruptibly()

    重入锁是可以被中断的,它不像synchronized关键字对中断是没有响应的,对中断没有响应的一个后果就是如果发生了死锁或者长期等待的情况(这不是我们想看的情况),这时候如果我们想让线程停下来,这时候可行的办法是给线程发送中断信号,让线程停下来。重入锁提供了一个功能,在加锁的同时响应你的中断,这样如果发生了死锁或者预料之外的情况,在一个锁上卡了很久,那么还有办法把线程中断掉,而不至于永久性的把线程卡死在那里。

    DeadLock类:

     1 import java.lang.management.ManagementFactory;
     2 import java.lang.management.ThreadInfo;
     3 import java.lang.management.ThreadMXBean;
     4 
     5 public class DeadLockChecker {
     6     private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
     7     final static Runnable deadlockCheck = new Runnable() {
     8         @Override
     9         public void run() {
    10             while(true) {
    11                 long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
    12                 if(deadlockedThreadIds != null) {
    13                     ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
    14                     for(Thread t : Thread.getAllStackTraces().keySet()) {
    15                         for(int i = 0; i < threadInfos.length; i++) {
    16                             if(t.getId() == threadInfos[i].getThreadId()) {
    17                                 t.interrupt();
    18                             }
    19                         }
    20                     }
    21                 }
    22                 try {
    23                     Thread.sleep(500);
    24                 } catch(InterruptedException e) {
    25                     
    26                 }
    27             }
    28         }
    29     };
    30     
    31     public static void check() {
    32         Thread t = new Thread(deadlockCheck);
    33         t.setDaemon(true);
    34         t.start();
    35     }
    36 }

    死锁检查,中断这两个线程。起了一个线程做死锁检查,这个线程设置为守护线程。因为这是一个完全跟业务没有关系,我只是做死锁检查而已,如果整个程序都退出了,我就没有必要做死锁检查了,所以说我这个线程存在与否,不应该决定虚拟机是否退出,所以把它设置为一个守护线程。如果我检查到了死锁,我就把这个线程中断掉。

    ReenterLockInt类:

     1 import java.util.concurrent.locks.ReentrantLock;
     2 
     3 import com.sishisan.ch5.deadlock.DeadLockChecker;
     4 
     5 public class ReenterLockInt implements Runnable {
     6 
     7     public static ReentrantLock lock1 = new ReentrantLock();
     8     public static ReentrantLock lock2 = new ReentrantLock();
     9     int lock;
    10     /**
    11      * 控制加锁顺序,方便构成死锁
    12      * @param lock
    13      */
    14     public ReenterLockInt(int lock) {
    15         this.lock = lock;
    16     }
    17     
    18     
    19     @Override
    20     public void run() {
    21         try {
    22             if (lock == 1) {
    23                 lock1.lockInterruptibly();
    24                 try {
    25                     Thread.sleep(500);
    26                 } catch (Exception e) {
    27                     e.printStackTrace();
    28                 }
    29                 lock2.lockInterruptibly();
    30             }else {
    31                 lock2.lockInterruptibly();
    32                 try {
    33                     Thread.sleep(500);
    34                 } catch (Exception e) {
    35                     e.printStackTrace();
    36                 }
    37                 lock1.lockInterruptibly();
    38             }
    39         } catch(Exception e) {
    40             e.printStackTrace();
    41         }finally {
    42             if(lock1.isHeldByCurrentThread())
    43                 lock1.unlock();
    44             if(lock2.isHeldByCurrentThread())
    45                 lock2.unlock();
    46             System.out.println(Thread.currentThread().getId() + ":线程退出");
    47         }
    48     }
    49     
    50     public static void main(String[] args) throws InterruptedException {
    51         ReenterLockInt r1 = new ReenterLockInt(1);
    52         ReenterLockInt r2 = new ReenterLockInt(2);
    53         Thread t1 = new Thread(r1);
    54         Thread t2 = new Thread(r2);
    55         t1.start();
    56         t2.start();
    57         Thread.sleep(1000);
    58         //中断其中一个线程
    59         DeadLockChecker.check();
    60     }
    61 }

    lock.lockInterruptibly()表示一个可中断的加锁。如果只是简单的lock方法,锁lock1(或lock2)是不会响应中断的。只有使用了lockInterruptibly方法加锁,才会响应中断。获得一个把锁,除非这个线程被中断,被中断后就会抛出一个InterruptedException异常。

    本程序中有两把锁(lock1和lock2),两把锁的原因是在程序中构造一个死锁的现象。开启两个线程(t1和t2),一个线程赋值1,一个线程赋值2。实例变量lock等于初始值,注意lock是实例变量,不是静态变量,所以两个实例可以赋不同的值。当lock==1的时候,锁lock1,当lock==2的时候,锁lock2。因此像这样的情况,会导致,线程1先锁了lock1,线程2先锁了lock2。同时,线程1申请lock2,线程2申请lock1。因此产生了很明显的死锁。对这个死锁来讲,使用lock方法,是不太容易把它解开。但是使用lockInterruptibly就可以对两个线程发生一个中断,导致这个程序依然可以顺利结束。在catch里面,可以打印中断异常,也可以做一些补救措施。在finally里面,如果还持有这把锁,就把这把锁解掉。

    4. 可限时

    超时不能获得锁,就返回false,不会永久等待构成死锁。

    可限时也是一种避免死锁和长期等待的有效措施。

     1 import java.util.concurrent.TimeUnit;
     2 import java.util.concurrent.locks.ReentrantLock;
     3 
     4 public class TimeLock implements Runnable {
     5     public static ReentrantLock lock = new ReentrantLock();
     6     @Override
     7     public void run() {
     8         try {
     9             if(lock.tryLock(5, TimeUnit.SECONDS)) {
    10                 Thread.sleep(6000);
    11             } else {
    12                 System.out.println("get lock failed");
    13             }
    14         } catch (InterruptedException e) {
    15             e.printStackTrace();
    16         } finally {
    17             if(lock.isHeldByCurrentThread()) {
    18                 lock.unlock();
    19             }
    20         }
    21     }
    22     
    23     public static void main(String[] args) {
    24         TimeLock tl = new TimeLock();
    25         Thread t1 = new Thread(tl);
    26         Thread t2 = new Thread(tl);
    27         t1.start();
    28         t2.start();
    29     }    
    30 }

    可限时使用tryLock,我希望获得一把锁,但是我给它一个时间限制,如果说我在有效的时间内没有拿到这把锁,我就认为申请锁失败,我就可以做其它事情去了,否则,我就可以拿到这把锁,继续执行。

    在这种情况下也是可以避免死锁的,因为死锁也必然会产生一个无限期等待。如果指定了时间,例如5秒钟,如果5秒钟拿不到锁就返回了,就把锁的申请释放掉。我们在锁申请失败的时候,可以做一些补救措施。比方说自己拿不到资源了,这时候就应该释放自己已经占用的资源,使得别人能够拿到你占用的资源,尽可能避免等待的现象。

    tryLock有两个参数,第一个参数就是timeout时间,你准备在这个锁上等多长时间数值,第二个参数就是时间单位。

    本程序示例中,一个线程会占用6秒钟时间,5秒钟后必然会导致另外一个线程申请锁失败。

    5. 公平锁

    先来先得

    public ReentrantLock(boolean fair);

    public static ReentrantLock fairLock = new ReentrantLock(true);

    ReentrantLock内部支持公平锁,构造函数参数fair表示这把锁是不是想要成为一把公平锁。

    所谓公平锁就是指保证线程先来先到,后来后得。在一般意义上,锁是不公平的,也就是先来申请锁的线程未必先得到锁,后来申请锁的线程未必后得到锁,在不公平状态下,有可能导致有些线程一致拿不到锁,产生饥饿现象。公平锁不会有这种问题,公平锁来说,先到的线程先拿到锁,后到的线程后拿到锁,但是公平锁虽然不会产生饥饿,但是公平锁的性能是要比非公平锁差很多,因为公平锁自己还要处理排队问题,所以如果没有特别的需求,我们没有必要使用公平锁的现象,在默认情况下,都是非公平锁。

    (二) Condition

    ReentrantLock跟Condition的关系,如同synchronized和Object.wait(),Object.notify()的关系类似。Object.wait()和Object.notify()就必须先获得monitor的所有权。如果没有得到所有权,就不能做这个对象做wait和notify操作。

    condition相当于在一个锁上wait和notify操作,因此在Condition里面也要获得这个锁。

     1. 概述

    类似于Object.wait()和Object.notify()

    与ReentrantLock结合使用

    2. 主要接口

    (1)void await() throws InterruptedException;

    等待,等待在condition上,等待在这把锁上,线程就会被挂起。

    (2)void awaitUniterruptibly();

    (3)long awaitNanos(long nanosTimeout) throws InterruptedException;

    (4)boolean await(long time, TimeUnit unit) throws InterruptedException;

    在某一个等待的锁上设定等待多长时间。

    (5)boolean awaitUntil(Date deadline) throws InterruptedException;

    (6)void signal();

    signal是通知,通知等待在condition上的那个线程继续往下走。

    (7)void signalAll();

    相当于notifyAll。

    代码示例ReenterLockCondition类:

     1 import java.util.concurrent.locks.Condition;
     2 import java.util.concurrent.locks.ReentrantLock;
     3 
     4 public class ReenterLockCondition implements Runnable{
     5     public static ReentrantLock lock = new ReentrantLock();
     6     public static Condition condition = lock.newCondition();
     7     
     8     @Override
     9     public void run() {
    10         try {
    11             lock.lock();
    12             condition.await();
    13             System.out.println("Thread is going on");
    14         } catch(InterruptedException e) {
    15             e.printStackTrace();
    16         } finally {
    17             lock.unlock();
    18         }
    19     }
    20 
    21     public static void main(String[] args) throws InterruptedException {
    22         ReenterLockCondition tl = new ReenterLockCondition();
    23         Thread t1 = new Thread(tl);
    24         t1.start();
    25         Thread.sleep(2000);
    26         //通知线程t1继续执行
    27         lock.lock();
    28         condition.signal();
    29         lock.unlock();
    30     }
    31 }

    首先要构造一个和ReentrantLock相对应的Condition,使用lock的newCondition()工厂方法把condition构造出来,所以lock和condition绑定在一起,这个condition就是为这个lock而产生的。我在lock上去做lock()操作,我继续在condition上去做等待,这时当前线程被挂起,直到被signalled或者被中断。所以此处会报出一个中断异常。在主线程中,我们做了一个signal,通知condition.await()继续执行,你要唤醒一个线程,这个线程唤醒之后不是马上能往下执行的,在唤醒之后返回之前,必须重新拿到这把锁。

    3. API详解

    await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线程会重新获得锁并继续执行。或者当前线程中断时,也能跳出等待。这和Object.wait()方法很相似。

    awaitUninterruptibly()方法和await()方法基本相同。但是它并不会再等待过程中响应中断。

    signal()方法用于唤醒一个在等待中的线程。相对的signalAll()方法会唤醒所有在等待中的线程。这和Object.notify()方法很类似。

    (三)Semaphore

    1. 概述

    信号量是共享锁,允许多个线程同时进入临界区。

    信号量:对于锁来讲,是互斥的,排他的,意思是只要我进去了,没有人再能进去了。对临界区来讲,是绝对的严格的保护。当有一个线程进入到区间里面去,另外就不可能有线程进入到区间里面去。换句话说,信号量许可数量为1的就是一把锁。信号量是允许若干个线程进入到临界区里面去,但是超过许可范围的线程就必须等待。所以信号量可以认为是一个广义上的锁,也可以认为是一个共享锁,它可以有多个线程去共享使用临界区,比如说,信号量当中可以给它指定10个许可,那么每一个许可可以分配给若干个线程,当然了一个线程也可以拿两三个许可,但是一般来说,在绝大多数情况下,因为每个线程都相对来说比较公平一些,每一个线程拿一个,但是可以根据业务的需要,让每一个线程多拿几个许可。只要我还有富余的许可,比如说十个线程,有5个分出去了,还有5个许可可以拿。那么这个时候有新的线程进来,也会把这个线程分配给它,这时候它拿到了许可就可以进去执行,但是如果这时候许可分配完成了,我只有一个许可可以用了,那么后面的线程就和锁一样,必须等待。换句话说,信号量允许多个线程进入临界区,当信号量许可数量等于1的时候,就相当于一把锁。那么如果我们的系统负载的能力非常有限,可能只能同时处理十个请求的任务,超过十个,可能没有能力执行。这时候可以用信号量控制,当有10个任务进来的时候,可以做执行,超过十个,让它做等待。这是一种非常简单的使用系统的功能做控制。

    2. 主要接口

    (1)public void acquire();  //获得信号量

    (2)public void acquireUniterruptibly();  //获得信号量,不支持中断

    (3)public boolean tryAcquire();  //获得信号量,不会等待,拿到了返回true,拿不到返回false。

    (4)public boolean tryAcquire(long timeout, TimeUnit unit);  //获得信号量,有时长,时长内,拿到了返回true,拿不到返回false。

    (5)public void release();    //释放信号量

    (6)public void acquire(int permits);  //让一个线程拿多个信号量

     3. 示例代码

     1 import java.util.concurrent.ExecutorService;
     2 import java.util.concurrent.Executors;
     3 import java.util.concurrent.Semaphore;
     4 
     5 public class SemapDemo implements Runnable{
     6     final Semaphore semp = new Semaphore(5);
     7 
     8     @Override
     9     public void run() {
    10         try {
    11             semp.acquire();
    12             //模拟耗时操作
    13             Thread.sleep(2000);
    14             System.out.println(Thread.currentThread().getId()+":done!");
    15         } catch (InterruptedException e) {
    16             e.printStackTrace();
    17         } finally {
    18             semp.release();
    19         }
    20     }
    21     
    22     public static void main(String[] args) {
    23         ExecutorService exec = Executors.newFixedThreadPool(20);
    24         final SemapDemo demo = new SemapDemo();
    25         for(int i=0; i<20; i++) {
    26             exec.submit(demo);
    27         }
    28     }
    29 }

    信号量的使用也是一种对资源的分配,所以使用信号量,可以使我们很好的分配资源。

    (四)ReadWriteLock

    1. 概述

    ReadWriteLock是JDK5中提供的读写分离锁。

    读写锁:为什么会有读写锁,对于重入锁来讲,或者对于传统的synchronized锁来说,它是不分线程的功能的,只要你想进来,你就必须拿到线程的这把锁。但是有些时候读和写是两种不同的操作,它们最大的区别是写回修改数据,而读不会修改数据。因此我们可以想想一下,不加以区分,所有线程进来都必须加锁,对于性能来讲是一个很大的杀伤力。比如说两个线程进来都是读线程,这时候我们应该抱着一种比较开放的姿态去看待这个问题,都是读的线程你就不应该加锁,大家都应该能进去。但是如果有写线程进来,这时候写线程有可能修改数据,这时候读线程有可能产生数据不一致。所以当有写线程发生的时候,我们才需要加锁。因此从功能上来讲,我们将锁进行功能上的划分,使得我们的性能能够有很大的提高,并行度有很大的提高。因为无论ReentrantLock或者synchronized来讲,加锁之后,并行度就是1。只有一个线程能进去,这完全不符合高并发的概念。ReadWriteLock允许很多线程一起做Read。显然,ReentrantLock或者synchronized都属于阻塞并行,它会把线程挂起。但是这个ReadWriteLock如果没有Write线程发生的情况下,所有的Read操作都是无等待的并发,所以这个并发级别非常高的,因为所有的线程必然能够在有限的步骤内完成自己的功能。

     2. 访问情况

    读-读不互斥:读读之间不阻塞。

    读-写互斥:读阻塞写,写也会阻塞读。

    写-写互斥:写写阻塞。

     
    非阻塞 阻塞
    阻塞 阻塞

     

    3. 主要接口

    (1)private static ReentrantReadWriteLock readWriteLock = newReentrantReadWriteLock();

    (2)private static Lock readLock = readWriteLock.readLock();

    (3)private static Lock writeLock = readWriteLock.writeLock();

    (五)CountDownLatch

     1. 概述

    倒数计数器

    一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各种设备、仪器的检查,每一个检查都可以看做有一个单独的线程在执行。每一个线程又一个自己的检查任务,如果说一共比如有10个检查项,每当一个线程完成自己的检查任务之后,它就做一个countDown,倒数,自己的任务已经完成了,或者这个任务已经到达了自己的执行目标。当所有的线程都完成为了自己的任务后,计数器就会清零。最终等待在countDown上的主线程,以火箭发射为例,主线程就不会再等待,执行后面的事情。只有等所有检查完毕后,引擎才能点火。

    这种场景就非常适合使用CountDownLatch。它可以阻塞点火线程,等待所有检查线程全部完工后,再执行。

     2. 主要接口

    (1)static final CountDownLatch end = new CountDownLatch(10);

    (2)end.countDown();

    (3)end.await();

    3. 示意图

    主线程会在发射之前,这个临界点上做一个等待,它可能会先到这个地方,等待这个发射,其它的检查任务就分别执行,检查过程也可能花费一些时间,可能并不会马上就做完,因此主线程会在这里做等待,当所有的检查任务全部都到达临界点,全部都执行完毕后,主线程会在这个点上继续往下执行。所以这个CountDownLatch就可以看成一个简单的栅栏,整个线程按照时间执行上划了一条线,所有的线程都要到了那个终点为止,我们那个主线程继续往下走。

     1 import java.util.Random;
     2 import java.util.concurrent.CountDownLatch;
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 
     6 public class CountDownLatchDemo implements Runnable{
     7     static final CountDownLatch end = new CountDownLatch(10);
     8     static final CountDownLatchDemo demo = new CountDownLatchDemo();
     9     
    10     @Override
    11     public void run() {
    12         try {
    13             Thread.sleep(new Random().nextInt(10)*1000);
    14             System.out.println("check complete");
    15             end.countDown();
    16         } catch (InterruptedException e) {
    17             // TODO Auto-generated catch block
    18             e.printStackTrace();
    19         }
    20     }
    21     
    22     public static void main(String[] args) throws InterruptedException {
    23         ExecutorService exec = Executors.newFixedThreadPool(10);
    24         for(int i = 0; i < 10; i++) {
    25             exec.submit(demo);
    26         }
    27         //等待检查
    28         end.await();
    29         System.out.println("Fire");
    30         exec.shutdown();
    31     }
    32 }

    CountDownLatch构造了一个10,也就是有十个线程或者十个检查任务,是需要在主线程开始之前,必须先跑完的。如果没有跑完,这个主线程就会一直等待。这里把检查任务简单做成sleep形式,每个检查任务都会sleep若干时间,来模拟一个检查。检查完毕后就会打印出check complete。做一个countDown,表示自己已经完毕,做10个countdown之后,使得主线程往下走。在main方法中,先开启十个线程,每个线程都会去做这个run,等到十个全部都跑完之后,把countDown都减掉之后,我们这个await才会返回,之后才会发射火箭。

    这个业务场景实际中非常普遍,我们每一个线程都要等待其它的准备任务完成后才可以做,这个准备任务怎样才可以通知主线程已经完成了呢,我们就可以用这个countDownLatch去做这个通知,不管有几个先序任务,都可以把它包装在这个countDownLatch上面。这是一个方便大家使用的工具类。

    (六) CyclicBarrir

    1. 概述

    循环栅栏

    Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么第一批十个线程后,计数器就会归零,然后接着凑齐下一批十个线程。

    循环栅栏:和CountDownLatch都表示在线程的执行线上横切了一条线,让线程都在那个时间点上做一个等待,cyclic表示循环,CountDownLatch只有一次计数,到了多线程归零之后主线程就可以往下走了,到了这个时候CountDownLatch就算使用结束了。Cyclic可以反复使用,可以一批批地去执行。比如说我要做十个线程,到了执行完毕,主线程就工作一次,我二批十个线程执行完毕后,主线程再工作一次,第三批执行完毕后,主线程再工作一次。。。相当于一个循环的姿态,或者一个栅栏。

    2. 主要接口

    public CyclicBarrier(int parties, Runnable barrierAction)  //parties参与者,相当于CountDownLatch的参数。多少个先序任务 。

    barrierAction就是当计数器一次计数完成后,系统会执行的动作。

    await()

    3. 示意图

    有士兵做集合,当有一个士兵到了之后,你不能叫集合完成,当有一个士兵到了之后,要等其它的士兵全部到达才叫集合完毕。所以第一个到的士兵要等其它士兵到达。到达完成后,我们下达任务,所有的士兵都会去完成自己的任务,当有一个士兵任务完成的时候,并不能表明我们的任务总体执行完成,要等到所有士兵的任务完成,才能说任务完成。这两次到达,其实都可以复用同一个CyclicBarrier,所以下一次计数和本一次计数都复用同一个实例,所以说它是一个实例循环复用的过程。等到十个集合完毕,等到十个任务完成,都用同一个实例做就可以了。

    4. 代码示例

     1 import java.util.Random;
     2 import java.util.concurrent.BrokenBarrierException;
     3 import java.util.concurrent.CyclicBarrier;
     4 
     5 public class CyclicBarrierDemo {
     6     public static class Soldier implements Runnable{
     7         private String soldier;
     8         private final CyclicBarrier cyclic;
     9         
    10         Soldier(CyclicBarrier cyclic, String soldierName){
    11             this.cyclic = cyclic;
    12             this.soldier = soldierName;
    13         }
    14         
    15         @Override
    16         public void run() {
    17             try {
    18                 //等待所有士兵到齐
    19                 cyclic.await();
    20                 doWork();
    21                 //等待所有士兵完成工作
    22                 cyclic.await();
    23             } catch (InterruptedException | BrokenBarrierException e) {
    24                 // TODO Auto-generated catch block
    25                 e.printStackTrace();
    26             }
    27         }
    28         
    29         void doWork() {
    30             try {
    31                 Thread.sleep(Math.abs(new Random().nextInt()%10000));
    32             } catch (InterruptedException e) {
    33                 // TODO Auto-generated catch block
    34                 e.printStackTrace();
    35             }
    36             System.out.println(soldier + ": 任务完成");
    37         }
    38     }
    39     
    40     public static class BarrierRun implements Runnable {
    41         boolean flag;
    42         int N;
    43         public BarrierRun(boolean flag, int N) {
    44             this.flag = flag;
    45             this.N = N;
    46         }
    47         @Override
    48         public void run() {
    49             if(flag) {
    50                 System.out.println("司令:【士兵" + N + "个,任务完成!】");
    51             } else {
    52                 System.out.println("司令:【士兵" + N + "个,集合完毕!】");
    53                 flag = true;
    54             }
    55         }
    56     }
    57     
    58     public static void main(String args[]) {
    59         final int N = 10;
    60         Thread[] allSoldier = new Thread[N];
    61         boolean flag = false;
    62         CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
    63         //设置屏障点,主要是为了执行这个方法
    64         System.out.println("集合队伍!");
    65         for(int i = 0; i < N; ++i) {
    66             System.out.println("士兵" + i + "报道");
    67             allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
    68             allSoldier[i].start();
    69 //            if(i == 5) {
    70 //                allSoldier[0].interrupt();
    71 //            }
    72         }
    73     }
    74 }

    说明:await()会抛出一个中断异常,如果被中断了就不会继续往下走了。其实很多像类似于wait或者await等待的函数都可能会抛出中断异常,目的就是陷入了长时间的等待,要提供一些方法把线程唤醒,避免永久性卡死的现象。另外还会抛出BrokenBarrierException,因为此处有10个参与者,也就是构建CyclicBarrier时有十个参与者,只有十个参与者都到了,才有可能往下走,await的线程才有可能往下走。如果当中有一个线程出现了某些意外,可能被中断等,导致它永远踩不到cyclicbarrier上面去,这样就使得其它等待在那边的线程都无法往下走,如果遇到这种情况,另外几个走不下去的线程就会抛出BrokenBarrierException,表示自己看起来是永远没有希望被执行下去,被中断的线程会抛出InterruptedException。

    可以看出CyclicBarrier和CountDownLatch语义上是有不同的,CountDownLatch是一个线程等待其它所有线程去做操作,CyclicBarrier可以让多个线程做一个相互等待,并且它的功能非常强大,它可以做一个循环的复用,我可以有一组一组不停的去使用同一个实例。

    (七) LockSupport

    1. 概述

    锁支持

    提供线程阻塞原语

    它可以把一个线程挂起,有点类似于suspend(),但是跟suspend()不一样。

    LockSupport思想有点类似于信号量,内部会产生许可之类的东西,park的时候就相当于拿掉了它的许可,unpark的时候相当于申请了这个许可。因此有一个特点是unpark发生在park之前,park并不会把线程阻塞住。这点和suspend不同。

    2. 主要接口

    (1)LockSupport.park();  //把线程挂起

    (2)LockSupport.unpark(t1);  //把线程继续往下执行

    3. 与suspend()比较

    不容易引起线程冻结

    LockSupport可以毫无顾忌的使用,suspend是不建议使用,将来可能废除的方法。

    4. 中断响应

    能够响应中断,但不会抛出异常。

    中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志。

    5. 代码示例

     1 import java.util.concurrent.locks.LockSupport;
     2 
     3 public class LockSupportDemo {
     4     public static Object u = new Object();
     5     static ChangeObjectThread t1 = new ChangeObjectThread("t1");
     6     static ChangeObjectThread t2 = new ChangeObjectThread("t2");
     7     
     8     public static class ChangeObjectThread extends Thread {
     9         public ChangeObjectThread(String name) {
    10             super.setName(name);
    11         }
    12         
    13         @Override
    14         public void run() {
    15             synchronized (u) {
    16                 System.out.println("in" + getName());
    17                 LockSupport.park();
    18             }
    19         }
    20     }
    21     
    22     public static void main(String[] args) throws InterruptedException {
    23         t1.start();
    24         Thread.sleep(100);
    25         t2.start();
    26         LockSupport.unpark(t1);
    27         LockSupport.unpark(t2);
    28         t1.join();
    29         t2.join();
    30     }
    31 }

    unpark发生在park之前,park是挂不住的。使用suspend和resume,这时线程处于永久性挂起状态。但是使用park和unpark就不会有这个问题。就算unpark发生在park之前,也没事,只要unpark过,就相当于把许可拿掉了,park再把许可加上去。一加一减,最终还是零,不会把线程阻塞住。

    (八)ReentrantLock的实现

    重入锁是一个应用层的实现,而不是系统层的实现。从本质来说都是java实现。它的实现有以下三个内容比较重要

    1. CAS状态

    判断这个锁有没有被人占用。比如说0表示没有人占用,1表示有人占用。在进入之前,通过CAS方法,试图改成1,看看能不能改成功,如果改成功了,说明原来是0,表示拿到了这个锁,如果改不成功,意味着原来不是0,意味着别人已经拿走了这个锁,这时候我没有拿到锁,就是这么一个思路。CAS状态修改是实现重入锁的一个关键,锁的实现内部本质就是一个CAS操作,用它来修改某一个变量,看看这个变量能不能修改成功,通过它是否修改成功,来反推我是不是应该拿到了这把锁。

    2. 等待队列

    如果我没有拿到锁,我这个线程应该怎么办呢,我就应该进入一个等待的队列。如果有多个线程进来,比如有一个线程拿到锁后进来,在里面做了很多事情,一直不释放,导致有多个线程进来,那么多个线程进来之后,就应该在这个队列中排队,所以ReentrantLock内部必然要维护一个等待队列,把所有等待在锁上的线程都保存起来,那么等待在队列中的线程要做什么事情呢,做的就是park操作。

    3. park()

    只要进入到等待队列中的线程都要做park把它挂起,那么什么时候把它unpark,让它继续执行,只有当前面的线程把锁unlock的时候,让我就在等待队列中挑一个出来,做unpark操作。

    4. 源码讲解

    如果更新成功,就表示拿锁成功,lock马上返回。拿锁不成功,首先要try一下,看看能不能成功,如果成功马上返回。如果失败,要把自己加到等待队列中去(addWaiter),等待队列中的节点都是node,每一个node节点必然包含每一个线程,node是对线程的包装,这样我顺着节点就能把等待在线程上的节点都能拿出来,通过一些操作,把node入队,并且把当前节点做一个返回。返回的节点会被调到acquireQueued函数中,再次尝试拿锁,最终拿不到走到parkAndCheckInterrupt,把它挂起。挂起之前会做一些判断。这是锁的内部的队列的维护。

    二、并发容器及典型源码分析

    (一)集合包装

    1. HashMap

    Collections.synchronizedMap

    public static Map m = Collections.synchronizedMap(new HashMap())

    HashMap并不是线程安全的,如果在多线程中使用HashMap会导致一些稀奇古怪的现象。最简单的把HashMap变成线程安全的方式是使用Collections.synchronizedMap,进行一个包装。这种方式只适合并发量非常小的情况。

    分析代码:

    synchronizedMap会把Map对象包装在synchronizedMap里面,之后,这个map的每一个操作,都会做一个同步,从而使得这个Map变成一个线程安全的包装。但是它的问题是这样做会使得get和put等等操作会变成一个串行的操作,get等待put,put等待get,get和get之间等待,如果并发量很高,那么其实线程一个个去做这个事情,所以这是一个并行的解决方案并不是一个高并发的解决方案。

    2. List

    synchronizedList

    同上

    3. Set

    synchronizedSet

    同上

    (二)ConcurrentHashMap

    高性能的高并发的解决方案。

    使用同HashMap。

    HashMap的实现:

    HashMap的内部实现其实是一个数组,数组里面放的是entry,里面有key,value,next。

    HashMap首先是一个数组,数组里面有一些表项,每一个表项放的是entry,某一个entry里面包含key和value。当一个key进来后,放到数组的哪一个槽位是通过hash算法得到的,比如说我们定位到第三个槽位中,这个槽位key和value都有了,所以把key映射到这个槽位当中,然后把entry拿出来,然后去拿到它的value。在entry当中有一个next,这个next是干什么用的,因为我们没有办法保证每一个key都能独立的映射到单独的槽位当中,换句话说,两个不同的key(key和key1),有可能会被映射到同一个槽位里,这种情况称为hash冲突,这种一种解决方案,就是放到同一个槽位中,一个entry数组的槽位中,我们怎么放两个entry呢,我这个entry中有一个next,我就把这个槽位的next指向key1所在的entry,所以这里变成了一个链表。所以HashMap的整体结构内部基本主要实现是一个数组,数组中放的是entry,每一个entry都是链表中的一环,链表的头部,所以当HashMap发生大量Hash冲突的时候,它就退化成一个链表,这个就是HashMap的基本实现。一般来说,HashMap也不会放满,因为放满之后必然产生冲突,所以HashMap会预留一些空间,冲突是我们不想看见的,一旦一个HashMap发生冲突之后,性能就会降低。

    ConcurrentHashMap的实现

    ConcurrentHashMap从主体思想上来说,和HashMap是一致的。

    jdk1.7实现ConcurrentHashMap实现方式:

    jdk1.7中采用Segment+HashEntry的方式进行实现。

    put操作:

    当你put一个Key和Value进去的时候,你会看到在ConcurrentHashMap中有一个Segment,这个是段。因为现在要构造一个高并发的HashMap并不是一个普通的HashMap,所有当有大量线程产生的时候,是不是意味着大量的线程要一个个进入HashMap中做元素的赋值操作。但是有一种方式使得我们可以不这样做,就是有一个大的HashMap,我们可以切割成若干个小的HashMap。当前一个线程进来后,先把一个线程映射到一个小的HashMap中去,我在一个小的HashMap中做一个普通的HashMap该做的事情,假如说我有16个小的HashMap,那就意味着我一个大的HashMap同时接受16个线程的访问,相比如之前只能接受一个线程的访问,理想情况下性能提高了16倍。这个小的HashMap就是Segment,这是Segment也是一个Key和Value对,所以当put一个Key的时候,首先是通过hash拿到一个j,拿到j之后就把segment拿出来了,把j个对象的地址偏移量算出来之后把它拿出来,segment是ConcurrentHashMap存放数据的主要地方,它是把一个大的HashMap拆分成若干个小的Segment。然后拿到之后你要保证这个s是存在的,因为s有可能不存在,如果不存在就要把它创建出来。使用第0个segment作为原型,把第k个segment创建出来把它设进去。拿出来之后,就要把key和value放到s中去。不同的线程对应的s未必是一样的,这样减少了多个线程冲突。这个put是线程安全的,但是没有使用任何锁。

    这个put就把key和value塞到segment里面去,这个里面有一个tryLock(),Segment继承于ReentrantLock,重入锁。所以tryLock就是重入锁tryLock,tryLock就是做CAS操作。如果成功,node就是null。成功之后,因为用的是tryLock ,所以不会去做一个等待,lock会等待,但是tryLock不会等待,没有线程阻塞的问题。然后我会去拿到我放到segment哪一个槽位上面。我把这个槽位算作是index,映射到这个index上,然后把这个index上第一个给拿出来。然后我尝试把key和value对插入到数组里面去。e!=null表示有hash冲突,这时候要做的要把e和当前的entry串起来,做一个链表,如果没有hash冲突,就把entry构造出来,把它set到数组当中去就完成了。如果有存在的情况做的是值的覆盖不是插入操作。如果tryLock成功,不会有挂起,最终unlock。如果tryLock不成功,走到scanAndLockForPut函数,做一些其它的事情。不成功的时候,表示别人先我一步拿到锁进去插入数据了,结果发生在segment段上的一个冲突。所以我会在这个地方,再去tryLock操作,因为也许在这个时候,别人已经把锁释放掉了,所以再次tryLock,此处是while循环,只有当tryLock成功,才会退出。从这里可以看到一点,虽然在这里要做一个多线程之间的同步操作,但是这里没有简单的使用ReentrantLock.lock()去把线程直接挂起,而是不停地使用tryLock操作,去尝试拿锁,拿不到在应用层处理,而tryLock里面是一个CAS的实现,tryLock如果不成功,这里有一个try的次数,如果try的次数超过最大尝试次数,才会去做lock操作,有可能把线程挂起掉。但是尽量不走到这里去,因为我不停的try,每try一次就次数加1,这是一个自旋,不停地尝试,进行若干尝试还不成功,才把自己挂起。在最后做了一个操作,如果entry的hash拿出来,跟前面的first不一样,表示我这个当前这个时候,我这个线程做了一个重哈希的操作,比如容量不够了,把容量扩展,如果发现它做了这么一些操作之后,那么我就把重试次数设置为-1,再去重试。从这里学习lock的使用,并不要把这个线程简单的lock住,而是在tryLock之后,实在不行,我们再去lock这个线程。rehash是当空间大于阈值的时候的操作,rehash会把空间翻倍,并且把node加进去,rehash是一个比较耗时的操作,它也做了一些优化,尽量不去新建元素,尽量重入现有元素,原因是虽然我把容量翻倍了,同样的一个hash在容量翻倍前后很有可能处于同一个位置,基于这样一个原因,它尽量重入元素而不是新建,只有当没有办法的时候才会新建。元素新建的操作不一定会走到,只有当hash在扩容后位置发生了改变,它才会进入新建操作。因为hash的空间只有2的指数次方,所以偏移量要么为0,要么为2的指数次方偏移量。那么从这里解释了ConcurrentHashMap的实现。get方法通过key把segment拿出来,再把entry拿出来,再把key相同的value拿出来,内部完全无锁操作。

    有一个地方需要注意就是size,这个hash表中一共有多少个数据,这个时候可以看到有一个lock,这个lock在一个循环里面,有多少个段,就有多少个lock。同样道理,在unlock的时候一样,有多少个段要做多少个unlock。要做全局的数据统计的时候,我这个size是要把所有的segment数据都要加起来,如果在这个时候,当前的数据还在被其它线程不断修改的时候,那我这个size的统计是没有办法统计出来的。所以我在统计之前,我再把segment的值全部拿到,然后再做这个数据的统计,再做累加,再把锁释放掉。这里是把锁分离之后带来的一点点的小的弊端,当你要取得全局信息的时候,要把所有的锁都拿到。但是这个size操作不太可能被高频率调用,所以也是可以接受的。

    (三)BlockingQueue

    阻塞队列:是一个接口,并不是一个实际的类。

    阻塞队列是线程安全的,但是并不是高性能的并发容器。也就跟ConcurrentHashMap不同,就是为性能服务的,就是在多线程环境下,得到最好的性能。BlockingQueue性能并不好。但是为什么它重要,因为它是一个非常好的在多线程中共享数据的容器。他有一个好处,如果一个队列为空,如果试图往里面去读一个数据的时候,读的线程就会做一个等待,等到有另外一个线程往里面写数据的时候,读线程就会被唤醒,并且拿到这个值。换种情况,如果这个队列为满,如果你还想往这个队列里面写数据,写的线程就会等待,等到有人把数据拿掉,写线程才会被唤醒,往队列里面写数据。BlockingQueue会引起线程的阻塞。

    BlockingQueue的主要实现有以下几个类:

    ArrayBlockingQueue内部使用了非常重要的工具ReentrantLock和Condition,ReentrantLock用来进行访问控制,保证它的线程安全,Condition用来做通知,如果要来拿数据,但是这时候数据为空,所以要等到有数据的时候NotEmpty通知要拿数据的线程,又或者要写数据,这时候位置为满,通知notFull通知数据不满了,可以写数据了。

    put操作做了加锁,如果随随便便加锁,一定不会是高性能的,像ConcurrentHashMap会先在应用层面做一个自旋等待,不行了再去加锁。put操作的时候首先判断是不是满,如果满要等,take操作会通知NotFull,通知等待线程进行插入操作。同理,insert的时候通知NotEmpty。

    BlockingQueue比较适合生产者消费者模式。

    (四)ConcurrentLinkedQueue

    高性能的队列。

    内部使用大量无锁的算法来做的,很少有挂起线程的操作。 

  • 相关阅读:
    String类型作为方法的形参
    [转] 为什么说 Java 程序员必须掌握 Spring Boot ?
    Centos打开、关闭、结束tomcat,及查看tomcat运行日志
    centos中iptables和firewall防火墙开启、关闭、查看状态、基本设置等
    防火墙没有关导致外部访问虚拟机的tomcat遇到的问题和解决方法
    可以ping通ip地址,但是访问80,或者8080报错
    JAVA的非对称加密算法RSA——加密和解密
    CA双向认证的时候,如果一开始下载的证书就有问题的,怎么保证以后的交易没有问题?
    图解HTTPS协议加密解密全过程
    https单向认证服务端发送到客户端到底会不会加密?
  • 原文地址:https://www.cnblogs.com/upyang/p/12343191.html
Copyright © 2011-2022 走看看