并发包
Java中还有一套并发工具包,位于包java.util.concurrent下,里面包括很多易用
且很多高性能的并发开发工具。
一、原子变量和CAS
为什么需要原子变量,因为对于例如count++这种操作,使用
synchronized成本太高了。Java并发包的基本原子变量有:
AtomicBoolean、AtomicInteger、AtomitLong和AtomicReference(原子引用类型)
一)AtomicInteger
1.用法
构造方法:
public AtomicInteger(int initialValue) public AtomicInteger() //初始值为0
get/set方法:
public final int get() public final void set(int newValue)
之所以称为原子变量,是因为它们包含一些以原子方式实现组合操作的方法。部分方法如下:
//以原子方式获取旧值并设置新值 public final int getAndSet(int newValue) //以原子方式获取旧值并给当前值加一 public final int getAndIncrement() //以原子方式获取旧值并给当前值减一 public final int getAndDecrement() //以原子方式获取旧值并给当前值加delta public final int getAndAdd(int delta) //以原子方式给当前值加一并获取新值 public final int incrementAndGet() //以原子方式给当前值减一并获取新值 public final int decrementAndGet() //以原子方式给当前值加delta并获取新值 public final int addAndGet(int delta)
以上方法都依赖于方法:
/** * @param expect 如果当前值等于expect,则更新当前值为update * @param update 更新后的值 * @return 更新成功返回true,否则返回false * */ public final boolean compareAndSet(int expect, int update)
compareAndSet是一个非常重要的方法,比较并设置,我们以后简称为CAS。
应用示例:
public class AtomicIntegerDemo { private static AtomicInteger counter = new AtomicInteger(0); static class Vistor extends Thread { @Override public void run() { for (int i = 0; i < 1000; i++) { counter.incrementAndGet(); } } } public static void main(String[] args) throws InterruptedException { int num = 1000; Thread[] threads = new Thread[num]; for (int i = 0; i < num; i++) { threads[i] = new Vistor(); threads[i].start(); threads[i].join(); } System.out.println(counter.get()); //1000000 } }
2.基本原理和思维
主要内部成员:
private volatile int value; //volatile可保证内存可见性
public final int incrementAndGet() { for(;;) { //获取当前值 int current = get(); //计算更新值 int next = current + 1; //如果更新没有成功,说明value被别的线程改了,则再去取最新值 //并尝试更新直到成功为止 if(compareAndSet(current, next)) return next; } }
原子变量的更新逻辑是非阻塞式,更新冲突的时候,它就重试,
不会阻塞,不会有上下文切换开销。对于大部分比较简单的操作,
这种方式性能都远高于使用阻塞的方式。
接下来是compareAndSet实现源代码:
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
其中usafe是sun.misc.Unsafe类型,定义为:
private static final Unsafe unsafe = Unsafe.getUnsafe();
一般应用程序不应该直接使用。原理上,一般的计算机系统都在硬件层次上直接支持CAS指令,
Java的实现都会利用这些特殊的指令。
3.实现锁
基于CAS,除了可以实现乐观非阻塞外,还可以实现悲观阻塞式算法,比如锁。
实际上,Java并发包的所有阻塞式工具、容器、算法都是基于CAS的(不过,也需要一些别的支持)。
我们自己来实现一个简单的锁:
/** * 一般而言这种阻塞方式过于消耗CPU,实际开发中应该使用并发包中的类 * 这里只是一个演示 */ public class MyLock { //status表示锁的状态,0表示未锁定,1表示锁定 private AtomicInteger status = new AtomicInteger(0); public void lock() { //更新成功后才退出 while (!status.compareAndSet(0, 1)){ Thread.yield(); } } public void unlock() { status.compareAndSet(1, 0); } }
4.ABA问题
假设当前值为A,如果另一个线程先将A修改成B,再修改A,
当前线程的CAS操作无法分辨当前值发生变化。解决办法
是使用AtomicStampedReference,在修改值的同时附件一个时间戳,
只有值和时间戳都相同才能进行修改,其CAS声明方法为:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
Pair pair = new Pair(100, 200); int stamp = 1; AtomicStampedReference<Pair> pairRef = new AtomicStampedReference<Pair>(pair, stamp); int newStamp = 2; pairRef.compareAndSet(pair, new Pair(200, 200), stamp, newStamp);
二、显式锁
Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类如下:
1)锁接口Lock,主要实现类是ReentrantLock;
2)读写锁和接口:ReadWriteLock,主要实现类是ReentrantReadWriteLock;
一)接口Lock
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
1)lock()/unlock():获取锁和释放锁,lock会阻塞直到成功。
2)lockInterruptibly():与lock()不同的是,它可以响应中断,如果被其他线程中断了,则抛出InterruptedException.
3)tryLock():只是尝试获取锁,立即返回,不阻塞,如果成功,返回true,否则返回false。
4)tryLock(long time, TimeUnit unit):在限定时间内等待,如果成功返回true,否则阻塞,在等待同时响应中断,超出时间没有获得锁,抛出false。
5)newCondition():
二)可重入锁ReentrantLock
1.基本用法
该类方法lock()和unlock()实现了synchronized一样的语义,但:
1)可重入,一个线程在持有锁的情况下,可以继续获得该锁;
2)可以解决竞态条件问题;
3)可以保证内存可见性;
ReentrantLock有两个构造方法:
//默认为不公平锁 public ReentrantLock() //设定true为公平锁 public ReentrantLock(boolean fair)
所谓公平是指,等待时间最长的线程优先获得锁。保证公平会英雄性能,一般也不需要,所以默认不保证。
public class Counter { private final Lock lock = new ReentrantLock(); private volatile int count; public void incr() { lock.lock(); try { count ++; //注意一定不要忘记释放锁 } finally { lock.unlock(); } } public int getCount() { return count; } }
2.使用tryLock()避免死锁
在持有一个锁同时获取另一个锁而获取不到的时候,可以释放
已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。
账户之间转账的例子:
public class Account { private Lock lock = new ReentrantLock(); private volatile double money; public Account(double money) { this.money = money; } public void add(double money) { lock.lock(); try { this.money += money; } finally { lock.unlock(); } } public void reduce(double money) { lock.lock(); try { this.money -= money; } finally { lock.unlock(); } } public double getMoney() {return this.money;} void lock() { lock.lock(); } void unlock() { lock.unlock(); } boolean tryLock() { return lock.tryLock(); } }
public class AccountManager { public static class NoEnoughMoneyException extends Exception {} //该方法问题:如果两个账户同时给对方转账,都先获得了第一个锁,则会发生死锁 public static void transfer(Account from, Account to, double money) throws NoEnoughMoneyException { from.lock(); try { to.lock(); try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { System.out.println("The from account money is " + from.getMoney()); System.out.println("The money is " + money); throw new NoEnoughMoneyException(); } } finally { to.unlock(); } } finally { from.unlock(); } } //避免死锁的方法 public static boolean tryTransfer(Account from, Account to, double money) throws NoEnoughMoneyException { if (from.tryLock()) { try { if (to.tryLock()) { try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException(); } return true; } finally { to.unlock(); } } } finally { from.unlock(); } } return false; } //模拟账户转账的死锁过程 public static void simulateDeadLock() throws InterruptedException { final int accountNum = 10; final Account[] accounts = new Account[accountNum]; final Random random = new Random(); for (int i = 0; i < accountNum; i++) { accounts[i] = new Account(random.nextInt(1000000)); System.out.println("account[" + i + "]" + accounts[i].getMoney()); } int threadNum = 100; Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) { threads[i] = new Thread() { @Override public void run() { int loopNum = 100; for (int k = 0; k < loopNum; k++) { int i = random.nextInt(accountNum); int j = random.nextInt(accountNum); int money = random.nextInt(10); if (i != j) { try { System.out.println( "Thread account[" + i + "]"+ accounts[i].getMoney()); //会发生死锁 transfer(accounts[i], accounts[j], money); } catch (Exception e) { e.printStackTrace(); } } } } }; threads[i].start(); } } public static void main(String[] args) { try { simulateDeadLock(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.ReentrantLock的实现原理
在最底层,该类依赖于CAS方法,另外,它依赖于LockSupport的一些方法。
1)LockSupport
LockSupport类的基本方法:
//使当前线程放弃CPU,进入等待(WAITING)状态 public static void park() //使指定线程恢复可运行状态 public static void unpark(Thread thread)
Thread t = new Thread(){ @Override public void run() { LockSupport.park(); System.out.println("exit"); } }; t.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(t);
park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,
但自己仍然是可运行状态。另外,park是响应中断的,当有中断发生时,park
会返回,线程的中断状态会被设置。另外,还需要说明的,park可能会无缘无故
地返回,程序应该重新检查park等待条件是否满足。park的两个变体:
//可指定最长等待时间,参数是相对于当前时间的纳秒数 public static void parkNanos(long nanos) //指定最长等待到什么时候,参数是绝对时间 public static void parkUntil(long deadline)
当等待超时,它们也会返回。
也可以指定一个对象,表示是由于该对象而进行等待,以便调试:
public static void park(Object blocker)
返回一个线程的blocker对象:
public static Object getBlocker(Thread t)
2)AQS
Java提供了一个抽象类AbstractQueuedSynchronizer简称AQS,用于各种锁的代码复用。
AQS封装了一个状态,给子类提供查询和设置状态的方法:
private volatile int state; protected final int getState() protected final void setState(int newState) protected final boolean compareAndSetState(int expect, int update)
用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:
private transient Thread exclusiveOwnerThread(); protected final void setExclusiveOwnerThread(Thread t); protected final Thread getExclusiveOwnerThread();
下面,我们以ReentrantLock的使用为例简要介绍AQS的原理:
ReentrantLock内部使用AQS,有三个内部类:
abstract static class Sync extends AbstractQueuedSynchronizer //fair为false使用该类 static final class NonfairSync extends Sync //fair为true时使用该类 static final class FairSync extends Sync
ReentrantLock内部有一个Sync成员:
private final Sync sync;
在构造方法中被赋值:
public ReentrantLock() { sync = new NonfairSync(); }
lock/unlock方法的实现:
public void lock() { sync.lock(); }
sync默认类型是NonfairSync,NonfairSync的lock代码为:
final void lock() { if(compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
ReentrantLock使用state表示是否被锁和持有数量,如果当前未被锁定,则立即获得锁,
否则调用acquire(1)获得锁。acquire()是AQS中的方法,代码为:
public final void acquire(int arg) { if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它调用tryAcquire()获取锁,tryAcquire必须被子类重写。NofairSync的实现为:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果未被锁定,则进行锁定 if(c == 0) { if(compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果已经锁定,增加锁定次数,这里state存放了锁定次数 else if(current == getExclusiveOwnerThread()) { int nextc = c + acquires; if(nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果tryAcquire()返回false,则AQS会调用:
//addWaiter()会新建一个节点Node,代表当前线程,然后加入内部等待队列 //放入等待队列后,调用acquireQueued尝试获得锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for(;;) { //首先检查当前节点是不是第一个等待节点 //如果是且能得到锁则将当前节点从等待队列中移除并返回 final Node p = node.predecessor(); if(p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //否则最终调用LockSupport.park放弃CPU,进入等待, //被唤醒后检查是否中断,记录中断标志 if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if(failed) cancelAcquire(node); } }
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
总结lock方法的基本过程:能获得锁就立即获得,否则加入等待队列,被唤醒后检查
自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待。这个过程
如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。
ReentrantLock的unlock代码为:
public void unlock() { sync.release(1); }
public final boolean release(int arg) { //tryRelease会修改状态释放锁 if(tryRelease(arg)) { Node h = head; if(h != null && h.waitStatus != 0) //会调用LockSupport.unpark将第一个等待的线程唤醒 unparkSuccessor(h); return true; } return false; }
4.对比ReentrantLock和synchronized
相比synchronized,ReentrantLock,支持以非阻塞方式获取锁,
可以响应中断,可以限时,更为灵活。不过synchronized使用更简单
性能通常更好,总之能用synchronized就不要用ReentrantLock。
三、显式条件
1.用法
锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronized相对应,
而显式条件与wait/notify相对应。显式锁与显式条件配合使用。Lock接口中定义的创建
显示条件的方法:
Condition newCondition();
Condition接口:
public interface Condition { //对应于Object的wait void await() throws InterruptedException; //不响应中断的等待,该方法不会由于中断结束,但当它返回时 //如果等待过程中发生中断,中断标志会被设置 void awaitUninterruptibly(); //等待相对时间,单位纳秒 long awaitNanos(long nanosTimeout) throws InterruptedException; //等待相对时间 boolean await(long time, TimeUnit unit) throws InterruptedException; //等待绝对时间 boolean awaitUntil(Date deadline) throws InterruptedException; //对应于notify void signal(); void signalAll(); }
与Object的wait方法一样,调用await方法前先要索取锁,如果没有锁会抛出异常。
当await在进入等待队列后,会释放锁,释放CPU,当其他线程唤醒它后,或等待
超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出。
另外,与Object的wait方法一样,await返回后就不一定代表等待的条件就满足了,
通常要将await的调用放到一个循环内,只有条件满足后才退出。示例:
public class WaitThread extends Thread{ private volatile boolean fire = false; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); try { while (!fire){ condition.await(); } } finally { lock.unlock(); } System.out.println("fired"); } catch (Exception e) { e.printStackTrace(); } } public void fire() { lock.lock(); try { this.fire = true; condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
2.生产者/消费者模式
在使用wait/notify实现生产者/消费者的时候有一个局限就是只能有一个条件队列,
而使用显式锁和条件不存在这个问题,例如:
public class MyBlockingQueue<E> { private Queue<E> queue = null; private int limit; private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public MyBlockingQueue(int limit) { this.limit = limit; queue = new ArrayDeque<E>(limit); } public void put(E e) throws InterruptedException { lock.lockInterruptibly(); try { while (queue.size() == limit) { notFull.await(); } queue.add(e); notEmpty.signal(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty()) { notEmpty.await(); } E e = queue.poll(); notFull.signal(); return e; } finally { lock.unlock(); } } }
3.实现原理
ReentrantLock的newCondition():
public Condition newCondition() { return sync.newCondition(); }
sync的newCondition方法:
final ConditionObject newCondition() { return new ConditionObject(); }
ConditionObject是AQS的一个内部类,其内部有一个队列,
表示条件等待队列,其成员声明:
//条件队列的头节点 private transient Node firstWaiter; //条件队列的尾节点 private transient Node lastWaiter;
public final void await() throws InterruptedException { //如果等待前中断标志位已经被设置,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); //1.为当前进程创建节点,加入条件等待队列 Node node = addConditionWaiter(); //2.释放持有的锁 int savedState = fullyRelease(node); int interruptMode = 0; //3.放弃CPU进行等待,直到被中断或isOnSyncQueue变为true //isOnSyncQueue,表示该节点被其他线程从条件等待队列 //移到了外部锁等待队列,等待的条件已经满足 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //重新获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }