并发是伴随着多核处理器的诞生而产生的,为了充分利用硬件资源,诞生了多线程技术。但是多线程又存在资源竞争的问题,引发了同步和互斥,并带来线程安全的问题。于是,从jdk1.5开始,引入了concurrent包来解决这些问题。
java.util.concurrent 包是专为 Java并发编程而设计的包。
在Java中,当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么称这个类是线程安全的。
一般来说,concurrent包基本上由有3个package组成 :
java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等 ; java.util.concurrent.atomic:提供所有原子操作的类, 如AtomicInteger, AtomicLong等; java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等。
concurrent包下的所有类可以分为如下几大类:
locks部分:显式锁(互斥锁和速写锁)相关,如ReentrantLock,ReentrantReadWriteLock等; atomic部分:原子变量类相关,是构建非阻塞算法的基础,如AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference等; executor部分:线程池相关,如ExecutorService,Callable,Future等; collections部分:并发容器相关,如BlockingQueue,Deque,ConcurrentMap等; tools部分:同步工具相关,如CountDownLatch,CyclicBarrier,Semaphore,Executors,Exchanger等。
JUC的类图结构如下所示:
concurrent包的优点有:
①功能丰富,诸如线程池(ThreadPoolExecutor),CountDownLatch等并发编程中需要的类已经有现成的实现,不需要自己去实现一套; 相比较而言,jdk1.4对多线程编程的主要支持几乎只有Thread, Runnable,synchronized等。synchronized和JDK5之后的Lock均是悲观锁(悲观锁一般是一个人在使用的时候,另一个人不能用,所以性能极低,所能支持的并发量就不高)。
②concurrent包里面的一些操作是基于硬件级别的CAS(compare and swap,比较再赋值),就是在cpu级别提供了原子操作,简单的说就是可以提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这种算法的。JUC(java.util.concurrent)是基于乐观锁的,既能保证数据不混乱,又能保证性能。
version-(版本管理)就是基于乐观锁机制-->拿着我们期望的结果,和现有结果进行比对,如果是相同的,就赋值,如果不是相同的,就重试。 CAS:有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。 CAS算法内部是通过JNI--native方法来实现: 由java底层的C语言或者C++实现。
一般情况下:同步容器是有使用价值的。有时候,我们的异步容器,比如ArrayList,在并发环境下,会有这些问题:①数据紊乱;②java.util.ConcurrentModificationException。这都是对于集合的读写状态不一致造成的问题。
我们可以这样构建同步容器:
Collections.synchronizedList(new ArrayList<>()); Collections.synchronizedMap() Collections.synchronizedSet()
上面的方法可以实现同步容器,但是使用了悲观锁,因而效率不高。
使用JUC体系中提供的容器,如:ConcurrentHashMap,则有这样的优势:①不会出现同步问题,数据是正常的;②速度相对较快 ,就算没有hashmap快,也比hashtable快的多。其实,ConcurrentHashMap内部也是有同步的,在这方面面和hashtable没有区别。那么,ConcurrentHashMap快在哪里?主要是因为,其内部划分了很多segment区域,当不同的线程操作不同的segment的时候,其实还是一个异步操作;只有当不同线程操作同一个segment的时候,才会发生同步操作,所以速度很快。一个ConcurrentHashMap内部最多能有16个segment。
我们接下来看一个非常有用的类CountDownLatch, 它是一个可以用来在一个进程中等待多个线程完成任务的类。在此给出一个应用场景:某个主线程接到一个任务,起了n个子线程去完成,但是主线程需要等待这n个子线程都完成任务以后才开始执行某个操作。详见代码:
package com.itszt.test1; import java.util.concurrent.CountDownLatch; /** * 主线程会在启动的子线程完全结束后再继续执行 */ public class Test { public static void main(String[] args) { Test test = new Test(); test.demoCountDown(); } public void demoCountDown() { int count = 10; final CountDownLatch l = new CountDownLatch(count); for (int i = 0; i < count; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().sleep(2 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread -" + index + "- has finished..."); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("now all threads have finished"); } }
执行结果如下所示:
thread -9- has finished... thread -8- has finished... thread -0- has finished... thread -4- has finished... thread -1- has finished... thread -2- has finished... thread -5- has finished... thread -6- has finished... thread -3- has finished... thread -7- has finished... now all threads have finished
接下来,我们再看下Atomic相关的类, 比如AtomicLong, AtomicInteger等。简单来说,这些类都是线程安全的,支持无阻塞无锁定的。
package com.itszt.test1; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; /** * 测试AtomicLong与long */ public class AtomicTest { public static void main(String[] args) { AtomicTest test = new AtomicTest(); test.testAtomic(); } public void testAtomic() { final int loopcount = 10000; int threadcount = 10; final NonSafeSeq seq1 = new NonSafeSeq(); final SafeSeq seq2 = new SafeSeq(); final CountDownLatch l = new CountDownLatch(threadcount); for (int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < loopcount; ++j) { seq1.inc(); seq2.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("NonSafeSeq:" + seq1.get()); System.out.println("SafeSeq with atomic: " + seq2.get()); } class NonSafeSeq { private long count = 0; public void inc() { count++; } public long get() { return count; } } class SafeSeq { private AtomicLong count = new AtomicLong(0); public void inc() { count.incrementAndGet(); } public long get() { return count.longValue(); } } }
上述代码执行如下:
finished : 0 finished : 3 finished : 2 finished : 6 finished : 9 finished : 5 finished : 8 finished : 1 finished : 4 finished : 7 both have finished.... NonSafeSeq:98454 SafeSeq with atomic: 100000
其中,NonSafeSeq是作为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;可以直接调用incrementAndGet来增加。通过上述执行结果可以看到,10个线程,每个线程运行了10,000次,理论上应该有100,000次增加,使用了普通的long是非线程安全的,而使用了AtomicLong是线程安全的。需要注意的是,这个例子也说明,虽然long本身的单个设置是原子的,要么成功要么不成功,但是诸如count++这样的操作就不是线程安全的,因为这包括了读取和写入两步操作。
在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上。
concurrent包提供了一个可以替代synchronized关键字的ReentrantLock,简单的说,你可以new一个ReentrantLock, 然后通过lock.lock和lock.unlock来获取锁和释放锁;需要注意的是,必须将unlock放在finally块里面。reentrantlock的好处有
:
①更好的性能;
②提供同一个lock对象上不同condition的信号通知;
③还提供lockInterruptibly这样支持响应中断的加锁过程,意思是说你试图去加锁,但是当前锁被其他线程hold住,然后你这个线程可以被中断。
package com.itszt.test1; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; /** * 测试ReentrantLock */ public class ReentrantLockTest { public static void main(String[] args) { ReentrantLockTest lockTest = new ReentrantLockTest(); lockTest.demoLock(); } public void demoLock() { final int loopcount = 10000; int threadcount = 10; final SafeSeqWithLock seq = new SafeSeqWithLock(); final CountDownLatch l = new CountDownLatch(threadcount); for (int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < loopcount; ++j) { seq.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("SafeSeqWithLock:" + seq.get()); } class SafeSeqWithLock { private long count = 0; private ReentrantLock lock = new ReentrantLock(); public void inc() { lock.lock(); try { count++; } finally { lock.unlock(); } } public long get() { return count; } } }
上述代码执行如下:
finished : 5 finished : 3 finished : 1 finished : 8 finished : 0 finished : 6 finished : 4 finished : 2 finished : 7 finished : 9 both have finished.... SafeSeqWithLock:100000
上述代码操作中,通过对inc操作加锁,保证了线程安全。
concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock。
A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing.
The read lock may be held simultaneously by multiple reader threads, so long as there are no writers.
The write lock is exclusive.
上述英文意思是说:读锁可以有很多个锁同时上锁,只要当前没有写锁; 写锁是排他的,上了写锁,其他线程既不能上读锁,也不能上写锁;同样,需要上写锁的前提是既没有读锁,也没有写锁。
package com.itszt.test1; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 测试读写锁 */ public class RWLockTest { public static void main(String[] args) { RWLockTest lockTest = new RWLockTest(); lockTest.testRWLock_getw_onr(); } public void testRWLock_getw_onr() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final Lock rlock = lock.readLock(); final Lock wlock = lock.writeLock(); final CountDownLatch l = new CountDownLatch(2); // start r thread,开启读锁 new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " now to get rlock--获取读锁"); rlock.lock(); try { Thread.currentThread().sleep(2 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " now to unlock rlock--释放读锁"); rlock.unlock(); l.countDown(); } }).start(); // start w thread,开启写锁 new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " now to get wlock--获取写锁"); wlock.lock(); System.out.println(Thread.currentThread().getName() + " now to unlock wlock--释放写锁"); wlock.unlock(); l.countDown(); } }).start(); try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " finished"); } }
上述代码执行如下:
Thread-0 now to get rlock--获取读锁 Thread-1 now to get wlock--获取写锁 Thread-0 now to unlock rlock--释放读锁 Thread-1 now to unlock wlock--释放写锁 main finished
ReadWriteLock的实现是ReentrantReadWriteLock,有趣的是,在一个线程中,读锁不能直接升级为写锁,但是写锁可以降级为读锁;这意思是说,如果你已经有了读锁,再去试图获得写锁,将会无法获得, 一直堵住了;但是如果你有了写锁,再去试图获得读锁,就没问题。
下面是一段写锁降级的代码:
public void testRWLock_downgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }
上述代码在main函数中执行后,结果如下:
now to get wlock now to get rlock now to unlock wlock now to unlock rlock finished
我们再看一段读锁升级的代码:
public void testRWLock_upgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }
上述代码执行中,已经有了读锁,再去试图获得写锁,将会无法获得, 程序一直堵塞,进入死锁状态,显示如下:
now to get rlock now to get wlock
另外,CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
CyclicBarrier也是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。
CountDownLatch和CyclicBarrier都是Synchronization aid,即“同步辅助器”,既然都是辅助工具,在使用中有什么区别,各自的使用场景如何?
CountDownLatch场景举例:一年级期末考试要开始了,监考老师发下去试卷,然后坐在讲台旁边玩着手机等待着学生答题,有的学生提前交了试卷,并约起打球了,等到最后一个学生交卷了,老师开始整理试卷,贴封条,下班,陪老婆孩子去了。 启发:CountDownLatch很像一个倒计时锁,倒计时结束,另一个线程才开始执行。就如监考老师要结束监考工作,必须等待所有学生都交了试卷,监考工作才能进入结束环节。 CyclicBarrier场景举例:公司组织户外拓展活动,帮助团队建设,其中最重要的一个项目就是要求全体员工(包括女同事,BOSS,一个都不能少)都能翻越一个高达四米,而且没有任何抓点的高墙,才能继续进行其他项目。 启发:CyclicBarrier可以看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。