zoukankan      html  css  js  c++  java
  • 第七章:取消与关闭——Java并发编程实战

    Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用

    中断:一种协作机制,能够使一个线程终止另一个线程的当前工作

    立即停止会使共享的数据结构处于不一致的状态,需要停止时,发出中断请求,被要求中断的线程处理完他当前的任务后会自己判断是否停下来

    一、任务取消

    若外部代码能在某个操作正常完成之前将其置入“完成”状态,则还操作是可取消的。(用户请求取消、有时间限制的操作<并发查找结果,一个线程找到后可取消其他线程>、应用程序事件、错误、关闭)

    取消策略:详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作

    举例:设置volatile变量为取消标志,每次执行前检查

     1 private volatile boolean canceled;
     2     
     3     @Override
     4     public void run() {
     5         BigInteger p = BigInteger.ONE;
     6         while (!canceled){
     7             p = p.nextProbablePrime();
     8             synchronized (this) { //同步添加素数
     9                 primes.add(p);
    10             }
    11         }
    12     }

    注意:这是一个有问题的取消方式,若线程阻塞在add操作后,那么即使设置了取消状态,它也不会运行到检验阻塞状态的代码,因此会永远阻塞

    1、中断

      线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。(在取消之外的其他操作使用中断都是不合适的)

      调用interrupt并不意味者立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。会在下一个取消点中断自己,如wait, sleep,join等

    1 public class Thread {
    2      public void interrupt() { ... }//中断目标线程,恢复中断状态
    3      public boolean isInterrupted() { ... }//返回目标线程的中断状态
    4      public static boolean interrupted() { ... }//清除当前线程的中断状态,并返回它之前的值(用于已经设置了中断状态,但还尚未相应中断)
    5      ...
    6  }

    阻塞库方法,例如Thread.sleep和Object.wait等,都会检查线程何时中断,并且在发现时提前返回。它们在响应中断时执行的操作包括 : 清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束

    • 显示的检测中断!Thread.currentThread().isInterrupted()后推出
    • 阻塞方法中抓到InterruptedException后退出

    2、中断策略——规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作

      由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

    3、响应中断

    • 传递异常(throws InterruptedException)

    • 恢复中断状态,从而事调用栈的上层代码能够对其进行处理。(Thread.currentThread().interrupt();)

    4、通过Future实现取消

      boolean cancel(boolean mayInterruptIfRunning);

    • 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败,返回false
    • 调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行
    • 如果任务已经执行,mayInterruptIfRunning参数决定了是否向执行任务的线程发出interrupt操作

    5、处理不可中断的阻塞——对于某些阻塞操作,只是设置了中断状态

    • Java.io包中的同步Socket I/O。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。
    • Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。
    • Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。
    • 获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。
     1 //改写interrupt方法发出中断请求   
     2  @Override
     3     public void interrupt() {
     4         try {
     5             socket.close(); //中断前关闭socket
     6         } catch (IOException e) {
     7             
     8         } finally{
     9             super.interrupt();
    10         }
    11     }

    6、采用newTaskFor来封装非标准的取消

    二、停止基于线程的服务

    应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。

    • 服务退出 -> 线程需要结束  无法通过抢占式的方法来停止线程,因此它们需要自行结束
    • 除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等
    • 线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池
    • 应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序不能拥有工作者线程,因此应用程序不能直接停止工作者线程。

    服务应该生命周期方法关闭它自己以及他拥有的线程

    • 要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法
    • ExecutorService提供的shutdown(), shutdownNow()

    1、示例:日志服务

     1 // LogWriter就是一个基于线程的服务,但不是一个完成的服务
     2 public class LogWriter {
     3     //日志缓存
     4     private final BlockingQueue<String> queue;
     5     private final LoggerThread logger;//日志写线程
     6 private static final int CAPACITY = 1000;
     7 
     8     public LogWriter(Writer writer) {
     9         this.queue = new LinkedBlockingQueue<String>(CAPACITY);
    10         this.logger = new LoggerThread(writer);
    11     }
    12 
    13 public void start() { logger.start(); }
    14 
    15     //应用程序向日志缓存中放入要记录的日志
    16     public void log(String msg) throws InterruptedException {
    17         queue.put(msg);
    18 }
    19 
    20     //日志写入线程,这是一个多生产者,单消费者的设计
    21     private class LoggerThread extends Thread {
    22         private final PrintWriter writer;
    23         public LoggerThread(Writer writer) {
    24             this.writer = new PrintWriter(writer, true); // autoflush
    25         }
    26         public void run() {
    27             try {
    28                 while (true)
    29                    writer.println(queue.take());
    30             } catch(InterruptedException ignored) {
    31             } finally {
    32                 writer.close();
    33             }
    34         }
    35     }
    36 }

    注意:可以中断阻塞的take()方法停止日志线程(消费者线程),但生产者没有专门的线程,没办法取消

     1 //日志服务,提供记录日志的服务,并有管理服务生命周期的相关方法
     2 public class LogService {
     3        private final BlockingQueue<String> queue;
     4        private final LoggerThread loggerThread;// 日志写线程
     5        private final PrintWriter writer;
     6        private boolean isShutdown;// 服务关闭标示
     7        // 队列中的日志消息存储数量。我们不是可以通过queue.size()来获取吗?
     8        // 为什么还需要这个?请看后面
     9        private int reservations;
    10 
    11        public LogService(Writer writer) {
    12               this.queue = new LinkedBlockingQueue<String>();
    13               this.loggerThread = new LoggerThread();
    14               this.writer = new PrintWriter(writer);
    15 
    16        }
    17 
    18        //启动日志服务
    19        public void start() {
    20               loggerThread.start();
    21        }
    22 
    23        //关闭日志服务
    24        public void stop() {
    25               synchronized (this) {
    26                      /*
    27                       * 为了线程可见性,这里一定要加上同步,当然volatile也可,
    28                       * 但下面方法还需要原子性,所以这里就直接使用了synchronized,
    29                       * 但不是将isShutdown定义为volatile
    30                       */
    31                      isShutdown = true;
    32               }
    33               //向日志线程发出中断请求
    34               loggerThread.interrupt();
    35        }
    36 
    37        //供应用程序调用,用来向日志缓存存放要记录的日志信息
    38        public void log(String msg) throws InterruptedException {
    39               synchronized (this) {
    40                      /*
    41                       * 如果应用程序发出了服务关闭请求,则不存在接受日志,而是直接
    42                       * 抛出异常,让应用程序知道
    43                       */
    44                      if (isShutdown)
    45                             throw new IllegalStateException(/*日志服务已关闭*/);
    46                      /*
    47                       * 由于queue是线程安全的阻塞队列,所以不需要同步(同步也可
    48                       * 但并发效率会下降,所以将它放到了同步块外)。但是这里是的
    49                       * 操作序列是由两个操作组成的:即先判断isShutdown,再向缓存
    50                       * 中放入消息,如果将queue.put(msg)放在同步外,则在多线程环
    51                       * 境中,LoggerThread中的  queue.size() == 0 将会不准确,所
    52                       * 以又要想queue.put不同步,又要想queue.size()计算准确,所
    53                       * 以就使用了一个变量reservations专用来记录缓存中日志条数,
    54                       * 这样就即解决了同步queue效率低的问题,又解决了安全性问题,
    55                       * 这真是两全其美
    56                       */
    57                      //queue.put(msg);
    58                      ++reservations;//存储量加1
    59               }
    60               queue.put(msg);
    61        }
    62 
    63        private class LoggerThread extends Thread {
    64               public void run() {
    65                      try {
    66                             while (true) {
    67                                    try {
    68                                           synchronized (LogService.this) {
    69                                                  // 由于 queue 未同步,所以这里不能使用queue.size
    70                                                  //if (isShutdown && queue.size() == 0)
    71 
    72                                                  // 如果已关闭,且缓存中的日志信息都已写入,则退出日志线程
    73                                                  if (isShutdown && reservations == 0)
    74                                                         break;
    75                                           }
    76                                           String msg = queue.take();
    77                                           synchronized (LogService.this) {
    78                                                  --reservations;
    79                                           }
    80                                           writer.println(msg);
    81                                    } catch (InterruptedException e) { /* 重试 */
    82                                    }
    83                             }
    84                      } finally {
    85                             writer.close();
    86                      }
    87               }
    88        }
    89 }

    注意:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提提交消息的权利

    2、关闭ExecutorService

      shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完

      shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法,无法保证能够停止正在处理的任务线程,但是会尽力尝试),并暂停处理正在等待的任务,并返回等待执行的任务列表。

      ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常

    3、“毒丸”对象——当得到这个对象时,立即停止

      在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作

    4、只执行一次的服务

      如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。

     1 boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
     2         throws InterruptedException {
     3 ExecutorService exec = Executors.newCachedThreadPool();
     4 //这里不能使用 volatile hasNewMail,因为还需要在匿名内中修改
     5     final AtomicBoolean hasNewMail = new AtomicBoolean(false);
     6     try {
     7         for (final String host : hosts)//循环检索每台主机
     8             exec.execute(new Runnable() {//执行任务
     9                 public void run() {
    10                    if (checkMail(host))
    11                        hasNewMail.set(true);
    12                 }
    13             });
    14     } finally {
    15         exec.shutdown();//因为ExecutorService只在这个方法中服务,所以完成后即可关闭
    16         exec.awaitTermination(timeout, unit);//等待任务的完成,如果超时还未完成也会返回
    17     }
    18     return hasNewMail.get();
    19 }

    5、shutdown的局限性

    我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查

     1 public class TrackingExecutor extends AbstractExecutorService {
     2     private final ExecutorService exec;
     3     private final Set<Runnable> tasksCancelledAtShutdown =
     4             Collections.synchronizedSet(new HashSet<Runnable>());
     5 
     6     public TrackingExecutor(ExecutorService exec) {
     7         this.exec = exec;
     8     }
     9 
    10     public List<Runnable> getCancelledTasks() {//返回被取消的任务
    11         if (!exec.isTerminated())//如果shutdownNow未调用或调用未完成时
    12             throw new IllegalStateException(/*...*/);
    13         return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    14     }
    15 
    16     public void execute(final Runnable runnable) {
    17         exec.execute(new Runnable() {
    18             public void run() {
    19                 try {
    20                     runnable.run();
    21                             /*参考:http://blog.csdn.net/coslay/article/details/48038795
    22                              * 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
    23                              * 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
    24                              * 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
    25                              * 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
    26                              * 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
    27                              * 将runnable.run()与下面的if放在一个同步块、而且还要将
    28                              *  shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
    29                              *  这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
    30                              *  只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
    31                              *  性能上的问题,因为什么所有的任务都会串形执行,这样还要
    32                              *  ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
    33                              *  这是“不可避免的竞争条件”
    34                              */
    35                 } finally {
    36                                    //如果调用了shutdownNow且运行的任务被中断
    37                     if (isShutdown()
    38                             && Thread.currentThread().isInterrupted())
    39                         tasksCancelledAtShutdown.add(runnable);//记录被取消的任务
    40                 }
    41             }
    42         });
    43 }
    44 // 将ExecutorService 中的其他方法委托到exec
    45 }

    三、处理非正常的线程终止

      在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中

      任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常

     1 //如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结
     2 public void run() {//工作者线程的实现
     3     Throwable thrown = null;
     4     try {
     5         while (!isInterrupted())
     6             runTask(getTaskFromWorkQueue());
     7     } catch (Throwable e) {//为了安全,捕获的所有异常
     8         thrown = e;//保留异常信息
     9     } finally {
    10         threadExited(this, thrown);// 重新将异常抛给框架后终结工作线程
    11     }
    12 }

    未捕获异常的线程

    在Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况

    在运行时间较长的应用程序中,通常会为所有的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

    public class UEHLogger implements Thread.UncaughtExceptionHandler {
        public void uncaughtException(Thread t, Throwable e) {
            Logger logger = Logger.getAnonymousLogger();
            logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
        }
    }

    四、JVM关闭

    JVM既可通过正常手段来关闭,也可强行关闭。

    • 正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时
    • 强行关闭:Runtime.halt,这种强行关闭方式将无法保证是否将运行关闭钩子

    1、关闭钩子

    • 关闭钩子是指通过Runnable.addShutdownHook注册的但尚未开始的线程
    • JVM并不能保证关闭钩子的调用顺序
    • 当所有的关闭钩子都执行结束时,如果runFinalizersOnExit为true,那么JVM将运行终结器(finalize),然后再停止
    • JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子
    • 关闭钩子应该是线程安全的
    • 关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间
    public void start()//通过注册关闭钩子,停止日志服务
    {
        Runnable.getRuntime().addShutdownHook(new Thread(){
            public void run()
            {
                try{LogService.this.stop();}
                catch(InterruptedException ignored){}
            }
        });
    }

    2、守护线程——一个线程来执行一些辅助工作,但有不希望这个线程阻碍JVM的关闭

      线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程

      普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出

    3、终结器(清理文件句柄或套接字句柄等)——避免使用

      垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而确保一些持久化的资源被释放。

      通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源

    例外:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的

  • 相关阅读:
    052 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 14 Eclipse下程序调试——debug2 多断点调试程序
    051 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 13 Eclipse下程序调试——debug入门1
    050 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 12 continue语句
    049 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 11 break语句
    048 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 10 案例——阶乘的累加和
    047 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 09 嵌套while循环应用
    046 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 08 for循环的注意事项
    045 01 Android 零基础入门 01 Java基础语法 05 Java流程控制之循环结构 07 for循环应用及局部变量作用范围
    剑指OFFER----面试题04.二维数组中的查找
    剑指OFFER----面试题03. 数组中重复的数字
  • 原文地址:https://www.cnblogs.com/HectorHou/p/6034274.html
Copyright © 2011-2022 走看看