zoukankan      html  css  js  c++  java
  • Java并发编程实战 第10章 避免活跃性危险

    死锁

    经典的死锁:哲学家进餐问题。5个哲学家 5个筷子 如果没有哲学家都占了一个筷子 互相等待筷子 陷入死锁

    数据库设计系统中一般有死锁检测,通过在表示等待关系的有向图中搜索循环来实现。

    JVM没有死锁检测。

    锁顺序死锁

    典型的列子:LeftRightDeadLock,多个线程尝试去获取左右的锁。这个和哲学家吃饭的两个筷子类似。

    下面的转账锁顺序死锁:

    1. public class DynamicOrderDeadlock {
    2.    public static void transferMoney(Account fromAccount, Account toAccount,
    3.          DollarAmount amount) throws InsufficientFundsException {
    4.       synchronized (fromAccount) {
    5.          synchronized (toAccount) {
    6.             if (fromAccount.getBalance().compareTo(amount) < 0)
    7.                return;
    8.             else {
    9.                fromAccount.debit(amount);
    10.                toAccount.credit(amount);
    11.             }
    12.          }
    13.       }
    14.    }
    15. }

    如果两个线程同时调用transferMoney,其中一个线程从X向Y转账,另一个线程从Y向X转账,那么就会发生死锁:

    A: transferMoney(myAccount, yourAccount, 10);

    B: transferMoney(yourAccount, myAccount, 20);

    如果执行时序不当,那么A可能获得myAccount的锁并等待yourAccount的锁,然而B此时持有yourAccount的锁,并正在等待myAccount的锁。

    如何检查这种死锁——查看是否存在嵌套的锁获取操作。由于我们无法控制参数的顺序,因此要解决这个问题,必须定义锁的顺序,并在整个应用程序中按照这个顺序来获取锁。就是说不论是myAccount转账到yourAccount,还是反过来,我们都先获取myAccount上的锁。

    1. public class InduceLockOrder {
    2.    private static final Object tieLock = new Object();
    3.  
    4.    public void transferMoney(final Account fromAcct, final Account toAcct,
    5.          final DollarAmount amount) throws InsufficientFundsException {
    6.       class Helper {
    7.          public void transfer() throws InsufficientFundsException {
    8.             if (fromAcct.getBalance().compareTo(amount) < 0)
    9.                return;
    10.             else {
    11.                fromAcct.debit(amount);
    12.                toAcct.credit(amount);
    13.             }
    14.          }
    15.       }
    16.       int fromHash = System.identityHashCode(fromAcct);
    17.       int toHash = System.identityHashCode(toAcct);
    18.       if (fromHash < toHash) {
    19.          synchronized (fromAcct) {
    20.             synchronized (toAcct) {
    21.                new Helper().transfer();
    22.             }
    23.          }
    24.       } else if (fromHash > toHash) {
    25.          synchronized (toAcct) {
    26.             synchronized (fromAcct) {
    27.                new Helper().transfer();
    28.             }
    29.          }
    30.       } else {
    31.          synchronized (tieLock) {
    32.             synchronized (fromAcct) {
    33.                synchronized (toAcct) {
    34.                   new Helper().transfer();
    35.                }
    36.             }
    37.          }
    38.       }
    39.    }
    40.  
    41. }

    上面程序使用System.identityHashCode方法,该方法将返回由Object.hashCode返回的值。在该版本中使用了System.identityHashCode来定义锁的顺序。

    在极少数情况下,两个对象可能拥有相同的散列值,此时必须通过某种任意的方法来决定锁的顺序,而这可能又会引入死锁。为了避免这种情况,可以使用"加时赛"锁。在获得两个Account之前,首先获得这个"加时赛"锁,从而保证每次只有一个线程以未知的顺序获得这两个锁,从而消除了死锁发生的可能性(只要一致地使用这种机制)。如果经常会出现散列冲突的情况,那么这种技术可能会成为并发性的一个瓶颈(这类似于在整个程序中只有一个锁的情况,比如上面的tieLock,它是他就是一个加时赛锁,这个锁是全局的,所有的线程共用,因为hashcode一样的情况少之又少,我们才可以做一个全局变量来控制这种顺序,否则就是真的串行了),但由于System.identityHashCode中出现散列冲突的频率非常低,因此这项技术以最小的代价,换来了最大的安全性。

    如果在Account中包含一个唯一的,不可变的并且具备可比性的键值,例如账号,那么要制定锁的顺序就更加容易了,通过键值对对象控制加锁顺序,因而不需要使用"加时赛"锁。

    在协作对象之间发生的死锁:

    如果在持有锁时调用某个外部方法,那么将出现活跃性问题。在这个外部方法中可能会获取其他锁(这可能会产生死锁)(我的理解:一旦你持有了两个锁,不论是在代码中显示持有的,还是在由于两个方法的调用隐式的持有了两个锁,在不同的线程顺序下,都要考虑死锁的隐患),或者阻塞时间过长,导致其他线程无法及时获得当前被持有的锁。如下代码:

    1. public class Taxi {
    2.     private final Dispatcher dispatcher;
    3.     private Point location, destination;
    4.  
    5.     public Taxi(Dispatcher dispatcher) {
    6.         this.dispatcher = dispatcher;
    7.     }
    8.  
    9.     public synchronized Point getLocation() {
    10.         return location;
    11.     }
    12.  
    13.     public synchronized void setLocation(Point location){
    14.         this.location = location;
    15.         if(location.equals(destination)){
    16.             dispatcher.notifyAvaliable(this);
    17.         }
    18.     }
    19. }
    20.  
    21. public class Dispatcher {
    22.     private final Set<Taxi> taxis;
    23.     private final Set<Taxi> avaliableTaxis;
    24.  
    25.     public Dispatcher() {
    26.         taxis = new HashSet<Taxi>();
    27.         avaliableTaxis = new HashSet<Taxi>();
    28.     }
    29.  
    30.     public synchronized void notifyAvaliable(Taxi taxi) {
    31.         avaliableTaxis.add(taxi);
    32.     }
    33.  
    34.     public synchronized Image getImage() {
    35.         Image image = new Image();
    36.         for (Taxi t : taxis) {
    37.             image.drawMarker(t.getLocation());
    38.         }
    39.         return image;
    40.     }
    41. }

    尽管没有任何方法会显式的获取两个锁但setLocation和getImage等方法的调用者都会获得两个锁。因为setLocation和notifyAvailable都是同步方法,因此调用setLocation的线程将首先获得Taxi的锁,然后获取Dispatcher的锁,同样调用getImage的线程将首先获取Dispatcher的锁,然后再获取每一个Taxi的锁,两个线程按照不同的顺序来获取两个锁,这时就有可能产生死锁。

    解决方案是开放调用(如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用(个人理解:其实就是不再方法上持有锁,而在方法内部持有锁)),使同步代码块仅被用于保护那些涉及共享状态的操作,修改代码如下:

    1. public class Taxi {
    2.     private final Dispatcher dispatcher;
    3.     private Point location, destination;
    4.  
    5.     public Taxi(Dispatcher dispatcher) {
    6.         this.dispatcher = dispatcher;
    7.     }
    8.  
    9.     public synchronized Point getLocation() {
    10.         return location;
    11.     }
    12.  
    13.     public synchronized void setLocation(Point location) {
    14.         boolean reachedLocation;
    15.         synchronized (this) {
    16.             this.location = location;
    17.             reachedLocation = location.equals(destination);
    18.         }
    19.         if (reachedLocation) {
    20.             dispatcher.notifyAvaliable(this);
    21.         }
    22.     }
    23. }
    24.  
    25. public class Dispatcher {
    26.     private final Set<Taxi> taxis;
    27.     private final Set<Taxi> avaliableTaxis;
    28.  
    29.     public Dispatcher() {
    30.         taxis = new HashSet<Taxi>();
    31.         avaliableTaxis = new HashSet<Taxi>();
    32.     }
    33.  
    34.     public synchronized void notifyAvaliable(Taxi taxi) {
    35.         avaliableTaxis.add(taxi);
    36.     }
    37.  
    38.     public Image getImage(){
    39.         Set<Taxi> copy;
    40.         synchronized (this){
    41.             copy = new HashSet<Taxi>();
    42.         }
    43.         Image image = new Image();
    44.         for(Taxi t: copy){
    45.             image.drawMarker(t.getLocation());
    46.         }
    47.         return image;
    48.     }
    49. }

      在程序中应尽量使用开放调用,与那些在持有锁时调用外部方法的程序相比,更易于对依赖于开放调用的程序进行死锁分析。

    ps:我一直理解的将锁块拆分的太细致了才会死锁,其实这么看来,我们应该细化了才能避免死锁呢。因为只有在同时拥有两个锁的时候才可能死锁,我们细化成不可能同时拥有两个锁。

    重新编写代码块以实现开放调用可能会破坏操作的原子性。在许多情况下,使某个操作失去原子性是可以接受的,也会带来更好的性能。

    资源死锁

    正如当多个线程相互持有彼此正在等待的锁而又不释放自己已持有的锁时会发生死锁,当它们在相同的资源集合上等待时,也会发生死锁。

    假设有两个资源池,例如两个不同数据库的连接池。资源池通常采用信号量来实现当资源池为空时的阻塞行为。如果一个任务需要连接两个数据库,并且在请求这两个资源时不会始终遵循相同的顺序,那么线程A可能持有与数据库D1的连接,并等待与数据库D2的连接,而线程B则持有与D2的连接并等待与D1的连接。(资源池越大,出现这种情况的可能性就越小。如果每个资源池都有N个连接,那么在发生死锁时不仅需要N个循环等待的线程,而且还需要大量不恰当的执行时序。)

    线程饥饿死锁

    另一种基于资源的死锁形式就是线程饥饿死锁:一个任务提交给另一个任务,并等待被提交任务在单线程的Executor中执行完成。这种情况下,第一个任务将永远等待下去,并使得另一个任务以及在这个Executor中执行的所有其他任务都停止执行。如果某些任务需要等待其他任务的结果,那么这些任务往往是产生线程饥饿死锁的主要来源,有界线程池 / 资源池与相互依赖的任务不能一起使用。

    1. package com.zjf;
    2.  
    3. import java.util.concurrent.Callable;
    4. import java.util.concurrent.ExecutionException;
    5. import java.util.concurrent.ExecutorService;
    6. import java.util.concurrent.Executors;
    7. import java.util.concurrent.Future;
    8.  
    9. public class ThreadDeadlock {
    10.  
    11.     ExecutorService exec = Executors.newSingleThreadScheduledExecutor();
    12. // ExecutorService exec = Executors.newCachedThreadPool(); //如果添加给线程池中添加足够多的线程,就可以让所有任务都执行,避免饥饿死锁。
    13.  
    14.    /**
    15.     * 模拟页面加载的例子
    16.     *
    17.     * 产生死锁分析:
    18.     * RenderPageTask任务中有2个子任务分别是"加载页眉"和"加载页脚"。当提交RenderPageTask任务时,实际上是向线程池中添加了3个任务,
    19.     * 但是由于线程池是单一线程池,同时只会执行一个任务,2个子任务就会在阻塞在线程池中。而RenderPageTask任务由于得不到返回,也会
    20.     * 一直堵塞,不会释放线程资源让子线程执行。这样就导致了线程饥饿死锁。
    21.     *
    22.     * 在一个Callable任务中,要返回2个子任务
    23.     * @author hadoop
    24.     *
    25.     */
    26.     class RenderPageTask implements Callable<String>{
    27.       public String call() throws Exception {
    28.           Future<String> header,footer;
    29.  
    30.           header = exec.submit(new Callable<String>(){
    31.               public String call() throws Exception {
    32.                   System.out.println("加载页眉");
    33.                   Thread.sleep(2*1000);
    34.                   return "页眉";
    35.               }
    36.           });
    37.  
    38.  
    39.           footer = exec.submit(new Callable<String>(){
    40.  
    41.               public String call() throws Exception {
    42.                   System.out.println("加载页脚");
    43.                   Thread.sleep(3*1000);
    44.                   return "页脚";
    45.               }
    46.           });
    47.  
    48.           System.out.println("渲染页面主体");
    49.  
    50.           return header.get() + footer.get();
    51.       }
    52.  
    53.     }
    54.  
    55.   public static void main(String[] args) throws InterruptedException, ExecutionException {
    56.       ThreadDeadlock td = new ThreadDeadlock();
    57.       Future<String> futre = td.exec.submit(td.new RenderPageTask());
    58.       String result = futre.get();
    59.       System.out.println("执行结果为:" + result);
    60.  
    61.   }
    62.  
    63. }

    死锁的避免与诊断

    如果一个线程每次至多只能获得一个锁,那么就不会产生锁顺序死锁。当然,这种情况通常并不现实,但如果能够避免这种情况,那么就能省去很多工作。如果必须获取多个锁,那么在设计时必须考虑锁的顺序:尽量减少潜在的加锁交互数量,将获取锁时需要遵循的协议写入正式文档并始终遵循这些文档。

    在使用细粒度锁的程序中,可以通过使用一种两阶段策略来检查代码中的死锁:首先,找出在什么地方将获取多个锁(使这个集合尽量小),然后对所有这些实例进行全局分析,从而确保它们在整个程序中获取锁的顺序都保持一致。尽可能地使用开放调用,这能极大地简化分析过程。如果所有的调用都是开放调用,那么要发现获取多个锁的实例是非常简单的,可以通过代码审查,或者借助自动化的源代码分析工具。

    还有一项技术可以检测死锁和从死锁中恢复过来,即显式使用Lock类中的定时tryLock功能来代替内置锁机制。当使用内置锁时,只要没有获得锁,就会永远等待下去,而显式锁则可以指定一个超时时限,在等待超过该时间后tryLock会返回一个失败信息。

    其他活跃性危险

    尽管死锁是最常见的活跃性危险,但在并发程序中还存在一些其他的活跃性危险,包括:饥饿、丢失信号和活锁。

    饥饿

    当线程由于无法访问它所需要的资源时而不能继续执行时,就发生了"饥饿"。引发饥饿的最常见资源就是CPU时钟周期。如果在Java应用程序中对线程的优先级使用不当,或者在持有锁时执行一些无法结束的结构(例如无限循环,或者无限制地等待某个资源),那么也可能导致饥饿,因为其他需要这个锁的线程将无法得到它。

    操作系统的线程调度器会尽力提供公平的、活跃性良好的调度,甚至超出Java语言规范的需求范围。在大多数Java应用程序中,所有线程都具有相同的优先级Thread.NORM_PRIORITY。线程优先级并不是一种直观的机制,而通过修改线程优先级带来的效果通常也不明显。当提高某个线程优先级时,可能不会起到任何作用,或者也可能使得某个线程的调度优先级高于其他线程,从而导致饥饿。

    通常,我们尽量不要改变线程的优先级。只要改变了线程的优先级,程序的行为就将与平台相关,并且会导致发生饥饿问题的风险。你经常能发现某个程序会在一些奇怪的地方调用Thread.sleep或Thread.yield,这是因为该程序视图克服优先级调整问题或响应性问题,并试图让低优先级的线程执行那个更多地时间。

    要避免使用线程优先级,因为这会增加平台依赖性,并可能导致活跃性问题。在大多数并发应用程序中,都可以使用默认的线程优先级。

    活锁

    活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为活锁会导致线程将不断重复执行相同的操作,而且总会失败。

    活锁的典型案例:活锁通常发生在处理事务消息的应用程序中:如果不能成功地处理某个消息,那么消息处理机制将回滚整个事务,并将它重新放到队列的开头。如果消息处理器在处理某种特定类型的消息时存在错误并导致它失败,那么每当这个消息从队列中取出并传递到存在错误的处理器时,都会发生事务回滚。由于这条消息又被放到队列开头,因此处理器将被反复调用,并返回相同的结果。(有时候也被称为毒药消息)虽然处理消息的线程并没有阻塞,但也无法继续执行下去。这种形式的活锁通常是由过度的错误恢复代码造成的,因为它错误地将不可修复的错误作为可修复的错误。

    活锁的定义:当多个相互协作的线程都对彼此进行响应从而修改各自的状态,并使得任何一个线程都无法继续执行时,就发生活锁。这就像两个过于礼貌的人在半路上面对面地相遇:他们彼此都让出对方的路,然而又在另一条路上相遇了。因此他们就这样反复地避让下去。

    解决方案:要解决这种活锁问题,需要在重试机制中引入随机性。例如,在网络上,如果两台机器尝试使用相同的载波来发送数据包,那么这些数据包就会发生冲突。这两台机器都检测到了冲突,并都在稍后再次重发。如果二者都选择在1秒钟后进行重试,那么它们又会发生冲突,并且不断地冲突下去,因而即使由大量闲置的带宽,也无法使数据包发送出去。为了避免这种情况发生,需要让它们分别等待一段随机地时间。在并发应用程序中,通过等待随机长度的时间和回退可以有效地避免活锁的发生。

    ps:活锁一般出现在重试机制出现问题的情况。

  • 相关阅读:
    C++-类的const成员变量
    Linux-编译器gcc/g++编译步骤
    C++-理解构造函数、析构函数执行顺序
    Linux-Unix版本介绍
    C++-const_cast只能用于指针和引用,对象的const到非const可以用static_cast
    Linux-如何查看登陆shell的类型
    C++-不要在构造和析构函数中调用虚函数
    C++-模板的声明和实现为何要放在头文件中
    C++-函数模板特化如何避免重复定义
    Linux-Gcc生成和使用静态库和动态库详解
  • 原文地址:https://www.cnblogs.com/xiaolang8762400/p/7071667.html
Copyright © 2011-2022 走看看