死锁发生在线程之间时,指两个线程各自持有的锁刚好是对方线程请求的,相互等待,系统阻塞。这种死锁往往是线程持有两个以上的锁,且以不同的顺序请求锁导致的,因此也叫锁顺序死锁。看例子:
package com.wulinfeng.concurrent; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class DemonstrateDeadLock { private static final int NUM_THREADS = 20; // 起20个线程处理 private static final int NUM_ACCOUNTS = 5; // 5个账户进行转账 private static final int NUM_ITERATIONS = 100000; // 转个10万次 public static void main(String[] args) { final DemonstrateDeadLock lock = new DemonstrateDeadLock(); final Random r = new Random(); final Account[] accounts = new Account[NUM_ACCOUNTS]; for (int i = 0; i < accounts.length; i++) { int money = r.nextInt(100000); // 为了避免出现转出账户金额小于转账金额的情况,5个账户的初始资金必须1万到10万之间 if (money < 10000) { money = 10000; } accounts[i] = lock.new Account(money); } class TransferThread extends Thread { public void run() { // 5个账户之间随机相互转账,每次转几块钱,每个线程转个10万次 for (int i = 0; i < NUM_ITERATIONS; i++) { int fromAcct = r.nextInt(NUM_ACCOUNTS); int toAcct = r.nextInt(NUM_ACCOUNTS); // 避免转给自己 if (fromAcct == toAcct) { toAcct = NUM_ACCOUNTS - fromAcct - 1; } DollarAmount amount = lock.new DollarAmount(r.nextInt(10)); System.out.printf("第%d次:从账户%d到账户%d转%d元 ", i, fromAcct, toAcct, amount.getAmount()); transferMoneyWithDeadLock(accounts[fromAcct], accounts[toAcct], amount); } } /** * 每次转账都会先锁转出账户A,再锁转入账户B,然后开始转账。因为被调用时参数顺序不可预测,因此锁顺序也不可预测 * 如:并发,H线程是A账户->B账户,J线程是B账户->A账户,传参顺序和锁顺序也分别是A->B,B->A, * H线程持有A锁等待B锁,J线程持有B锁等待A锁,互相等待,死锁 * * @param fromAccount * @param toAccount * @param amount */ public void transferMoneyWithDeadLock(Account fromAccount, Account toAccount, DollarAmount amount) { synchronized (fromAccount) { synchronized (toAccount) { if (fromAccount.getBalance().compareTo(amount) < 0) { throw new IllegalAccessError("not enough money to transfer."); } else { fromAccount.debit(amount); toAccount.credit(amount); } } } } private final Object tieLock = new Object(); /** * 5个账户的hashCode值是唯一的,每次先锁住较小值的账户可以实现每次都按顺序锁 * 如H线程是A账户->B账户,J线程是B账户->A账户,A的hashCode:1,B的hashCode:2 * 因为hashCode上A<B,所以锁获取顺序H:A->B;J:A->B,两个线程以相同顺序获取锁,不会导致相互等待的情况 * * @param fromAcct * @param toAcct * @param amount */ public void transferMoneyWithoutDeadLock(final Account fromAcct, final Account toAcct, final DollarAmount amount) { class Helper { public void transfer() { if (fromAcct.getBalance().compareTo(amount) < 0) { throw new IllegalAccessError("not enough money to transfer."); } else { fromAcct.debit(amount); toAcct.credit(amount); } } } int fromHash = System.identityHashCode(fromAcct); int toHash = System.identityHashCode(toAcct); if (fromHash < toHash) { synchronized (fromAcct) { synchronized (toAcct) { new Helper().transfer(); } } } else if (fromHash > toHash) { synchronized (toAcct) { synchronized (fromAcct) { new Helper().transfer(); } } } else { synchronized (tieLock) { synchronized (fromAcct) { synchronized (toAcct) { new Helper().transfer(); } } } } } public void transferMoneyWithLock(Account fromAcct, Account toAcct, DollarAmount amount) { long timeout = 100; // 转账超过100毫秒即超时 long stopTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout); while (true) { if (fromAcct.lock.tryLock()) { // 转入账户获取到锁才进入下面逻辑处理,获取不到则退出 try { if (toAcct.lock.tryLock()) { // 转出账户获取到锁才进入下面逻辑处理,获取不到则退出 try { if (fromAcct.getBalance().compareTo(amount) < 0) { throw new IllegalAccessError("not enough money to transfer."); } else { fromAcct.debit(amount); toAcct.credit(amount); return; // 转账成功,退出循坏 } } finally { toAcct.lock.unlock(); // 释放转出账户锁 } } } finally { fromAcct.lock.unlock(); // 释放转出账户锁 } } // 走到这里,说明获取转入或者转出锁失败,若不超时则退出重入 if (System.nanoTime() < stopTime) { return; } // 否则休眠随机时间,避免活锁 try { Thread.sleep(r.nextLong()); } catch (InterruptedException e) { e.printStackTrace(); } } } } for (int i = 0; i < NUM_THREADS; i++) { new TransferThread().start(); } } /** * 账户,提供转出、转入方法 * * @author Administrator * */ class Account { private DollarAmount balance; private Lock lock; public Account(int account) { this.balance = new DollarAmount(account); this.lock = new ReentrantLock(); } public DollarAmount getBalance() { return balance; } public void debit(DollarAmount amount) { this.balance.remove(amount); } public void credit(DollarAmount amount) { this.balance.add(amount); } } /** * 转账金额,提供加钱、减钱方法 * * @author Administrator * */ class DollarAmount { private int amount; public DollarAmount(int amount) { this.amount = amount; } public int compareTo(DollarAmount amount2) { int result; if (this.amount > amount2.getAmount()) { result = 1; } else if (this.amount < amount2.getAmount()) { result = -1; } else { result = 0; } return result; } public int add(DollarAmount amount) { return this.amount += amount.getAmount(); } public int remove(DollarAmount amount) { return this.amount -= amount.getAmount(); } public int getAmount() { return amount; } } }
解决死锁有两种方法,一是使用内置锁通过保证锁顺序一致,参见上面代码里的transferMoneyWithoutDeadLock方法;二是使用显式锁的tryLock进行重试(获取锁失败则退出而不是阻塞),参见上面的transferMoneyWithLock方法。
另一种比较隐蔽的锁顺序不一致的情况发生在对象的方法调用,见下面代码:
package com.wulinfeng.concurrent; import java.util.HashSet; import java.util.Set; public class ObjectsDeadLock { private static final int NUM_THREADS = 20; // 起20个线程处理 // 随机产生的字符串 private static final String[] msgs = new String[] { "abc", "dfg", "hij", "lmn", "opq", "rst", "wlf", "xyz" }; private static final int NUM_ITERATIONS = 100000; // 每个线程跑个10万次 public static void main(String[] args) { final ObjectsDeadLock lock = new ObjectsDeadLock(); final Dispatcher dispatcher = lock.new Dispatcher(); final Taxi taxi = lock.new Taxi(dispatcher); class WorkThread extends Thread { public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { taxi.setStrDead(msgs[i % msgs.length]); dispatcher.getStrDead(i); } }; } for (int i = 0; i < NUM_THREADS; i++) { new WorkThread().start(); } } class Taxi { private String msg; private final Dispatcher dispatcher; public Taxi(Dispatcher dispatcher) { this.dispatcher = dispatcher; } public synchronized String getStr() { return msg; } /** * 该方法先给自己加锁,再调用Dispatcher对象的加锁方法notifyAvailable,加锁顺序Taxi->Dispatcher * * @param msg */ public synchronized void setStrDead(String msg) { if (msg.equals("wlf")) { this.msg = msg; dispatcher.notifyAvailable(this); } } /** * 只给自己加锁,调用其他对象方法不加锁:锁Taxi->释放Taxi->锁Dispatcher->释放Dispatcher * * @param msg */ public void setStrNotDead(String msg) { boolean available = false; synchronized (this) { if (msg.equals("wlf")) { this.msg = msg; available = true; } } if (available) { dispatcher.notifyAvailable(this); } } } class Dispatcher { private final Set<Taxi> availableTaxis; public Dispatcher() { availableTaxis = new HashSet<Taxi>(); } public synchronized void notifyAvailable(Taxi taxi) { availableTaxis.add(taxi); } /** * 该方法里调用Taxi对象的加锁方法getStr,加锁顺序Dispatcher->Taxi */ public synchronized void getStrDead(int count) { for (Taxi t : availableTaxis) { System.out.println(t.getStr() + ": " + count); } availableTaxis.clear(); // 清空集合,准备迎接下一个符合条件的对象 } /** * 只给自己加锁,调用其他对象方法不加锁:锁Dispatcher->释放Dispatcher->锁Taxi->释放Taxi * * @param count */ public void getStrNotDead(int count) { Set<Taxi> copy; // 复制给当前线程读,不影响其他线程写,读写分离 synchronized (this) { copy = new HashSet<Taxi>(availableTaxis); } for (Taxi t : copy) { System.out.println(t.getStr() + ": " + count); } // 读完清空 synchronized (this) { availableTaxis.clear(); } } } }
这里无法改变线程里锁的顺序,因此只能减锁,就是在加锁方法A里调用另一个对象的加锁方法B时,缩小加锁的代码块,让加锁方法B逃离方法A的锁。run方法里改两行代码再跑一次,死锁不再出现:
public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { taxi.setStrNotDead(msgs[i % msgs.length]); dispatcher.getStrNotDead(i); } };
死锁发生在线程池中时,指的是池里的某任务无限期的等待它所依赖的其他任务,这也叫线程饥饿死锁。看例子:
package com.wulinfeng.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ThreadDeadlock { ExecutorService es = Executors.newSingleThreadExecutor(); public class ReaderTask implements Callable<String> { Future<String> header; String body = "Hell, world!"; String result = null; @Override public String call() throws Exception { // 单线程池提交子任务 header = es.submit(new LoadFileTask("welcome!")); result = header.get() + " " + body + " "; System.out.printf("result: %s", result); return result; } } public class LoadFileTask implements Callable<String> { private String msg; public LoadFileTask(String msg) { this.msg = msg; } public String call() { try { Thread.sleep(1000); // 模拟文件下载,耗时1秒 } catch (InterruptedException e) { e.printStackTrace(); } return msg; } } public static void main(String[] args) { ThreadDeadlock t = new ThreadDeadlock(); try { // 单线程池提交父任务 Future<String> father = t.es.submit(t.new ReaderTask()); // 获取父任务结果依赖子任务结果,线程池只允许一个线程执行,父等子执行,子等父让位 System.out.printf("out: %s", father.get()); } catch (Exception e) { e.printStackTrace(); } finally { t.es.shutdown(); } } }
上面使用了单线程的线程池,也就是每次只能容纳一个线程做串行执行的线程池。父线程需要启动子线程获取结果才能返回,而子线程需要父线程执行结束才能启动,因此父线程和子线程互相无限等待。解决方式很简单,线程池需要允许容纳两个以上的线程就可避免线程饥饿,如把newSingleThreadExecutor改为newCachedThreadPool。