zoukankan      html  css  js  c++  java
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性

     
     
    概述
     

    第1 部分 问题引入

      当通过 shutdownNow  来强行关闭 ExecutorService 时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。

      然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着这我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需知道当 Executor 关闭时哪些任务正在执行。

    第2 部分 实例

      在下面程序 TrackingExecutor 中给出了如何在关闭过程中判断正在执行的任务。通过封装 ExecutorService 并使得execute 记录哪些任务是在关闭后取消的,TrackingExecutor 可以找出哪些任务已经开始但还没有正常完成。在 Executor 结束后,getCancelledTasks 返回被取消的任务清单。

      1 /**
      2  * 7.21 在 ExecutorService 中跟踪在关闭之后取消的任务
      3  * @ClassName: TrackingExecutor
      4  * @author xingle
      5  * @date 2014-11-12 下午8:39:33
      6  */
      7 public class TrackingExecutor extends AbstractExecutorService{
      8     private final ExecutorService exec;
      9     private final Set<Runnable> tasksCancelledAtShutdown = Collections
     10             .synchronizedSet(new HashSet<Runnable>());
     11     
     12     public TrackingExecutor(ExecutorService exec){
     13         this.exec = exec;
     14     }
     15     
     16     public List<Runnable> getCancelledTasks(){
     17         if(!exec.isTerminated())
     18             throw new IllegalStateException();
     19         return new ArrayList<Runnable>(tasksCancelledAtShutdown);
     20     }
     21     
     22     /**
     23      * 
     24      * @Description: TODO
     25      * @param command
     26      * @author xingle
     27      * @data 2014-11-13 上午9:06:56
     28      */
     29     @Override
     30     public void execute(final Runnable runnable) {
     31         exec.execute(new Runnable() {
     32             
     33             @Override
     34             public void run() {
     35                 try{
     36                     runnable.run();
     37                 }finally{
     38                     if(isShutdown() && Thread.currentThread().isInterrupted())
     39                         tasksCancelledAtShutdown.add(runnable);
     40                 }
     41             }
     42         });
     43     }
     44 
     45     /**
     46      * 下面将ExecutorService 的其他方法委托给 exec
     47      */
     48         
     49     /**
     50      * 
     51      * @Description: TODO
     52      * @author xingle
     53      * @data 2014-11-13 上午9:06:56
     54      */
     55     @Override
     56     public void shutdown() {
     57         exec.shutdown();
     58     }
     59 
     60     /**
     61      * 
     62      * @Description: TODO
     63      * @return
     64      * @author xingle
     65      * @data 2014-11-13 上午9:06:56
     66      */
     67     @Override
     68     public List<Runnable> shutdownNow() {
     69         return exec.shutdownNow();
     70     }
     71 
     72     /**
     73      * 
     74      * @Description: TODO
     75      * @return
     76      * @author xingle
     77      * @data 2014-11-13 上午9:06:56
     78      */
     79     @Override
     80     public boolean isShutdown() {
     81         return exec.isShutdown();
     82     }
     83 
     84     /**
     85      * 
     86      * @Description: TODO
     87      * @return
     88      * @author xingle
     89      * @data 2014-11-13 上午9:06:56
     90      */
     91     @Override
     92     public boolean isTerminated() {
     93         return exec.isTerminated();
     94     }
     95 
     96     /**
     97      * 
     98      * @Description: TODO
     99      * @param timeout
    100      * @param unit
    101      * @return
    102      * @throws InterruptedException
    103      * @author xingle
    104      * @data 2014-11-13 上午9:06:56
    105      */
    106     @Override
    107     public boolean awaitTermination(long timeout, TimeUnit unit)
    108             throws InterruptedException {
    109         return exec.awaitTermination(timeout, unit);
    110     }
    111 
    112 
    113 }

      在程序 WebCrawler 中给出了 TrackingExecutor 的用法。网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保持它的状态,以便稍后重启动。CrawlTask 提供了一个 getPage 方法,该方法能找出正在处理的页面。当爬虫程序关闭时,无论是还没有开始的任务,还是那些被取消的任务,都将记录他们的URL,因此当爬虫程序程序启动时,就可以将这些URL 的页面抓取任务加入到任务队列中。

      1 /**
      2  * 7.22 使用TrackingExecutorService 来保存未完成的任务以备后续执行
      3  * @ClassName: WebCrawler
      4  * TODO
      5  * @author xingle
      6  * @date 2014-11-13 上午9:17:54
      7  */
      8 public abstract class WebCrawler {
      9     private volatile TrackingExecutor exec;
     10     @GuardedBy("this")
     11     public final Set<URL> urlsToCrawl = new HashSet<URL>();
     12 
     13     private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
     14     private static final long TIMEOUT = 500;
     15     private static final TimeUnit UNIT = TimeUnit.MICROSECONDS;
     16     
     17     public WebCrawler(URL startUrl){
     18         urlsToCrawl.add(startUrl);
     19     }
     20     
     21     public synchronized void start(){
     22         exec = new TrackingExecutor(Executors.newCachedThreadPool());
     23         for (URL url: urlsToCrawl)
     24             submitCrawlTask(url);
     25         urlsToCrawl.clear();
     26     }
     27     
     28     /**
     29      * 提交爬虫任务
     30      * @param url
     31      * @author xingle
     32      * @data 2014-11-13 上午9:46:01
     33      */
     34     private void submitCrawlTask(URL url) {
     35         exec.execute(new CrawlTask(url));
     36     }
     37     
     38     protected abstract List<URL> processPage(URL url);
     39     
     40     /**
     41      * 保存未完成的
     42      * @param urlsToCrawl
     43      * @author xingle
     44      * @data 2014-11-13 上午10:10:07
     45      */
     46     private void saveUncrawled(List<Runnable> uncrawled) {
     47         for (Runnable task:uncrawled){
     48             URL url = ((CrawlTask)task).getPage();
     49             System.out.println("保存未完成的URL:"+url);
     50             urlsToCrawl.add(url);    
     51         }
     52             
     53     }
     54     
     55     //爬虫任务
     56     private class CrawlTask implements Runnable{
     57         private final URL url;
     58         
     59         CrawlTask(URL url){
     60             this.url = url;
     61         }
     62         
     63         private int count = 1;
     64 
     65         boolean alreadyCrawled() {
     66             return seen.putIfAbsent(url, true) != null;
     67         }
     68 
     69         void markUncrawled() {
     70             seen.remove(url);
     71             System.out.printf("marking %s uncrawled%n", url);
     72         }
     73                 
     74         @Override
     75         public void run() {
     76             for (URL link :processPage(url)){
     77                 if(Thread.currentThread().isInterrupted())
     78                     return;
     79                 System.out.println("提交的爬虫url:"+link);
     80                 submitCrawlTask(link);
     81             }            
     82         }
     83         
     84         public URL getPage(){
     85             return url;
     86         }        
     87     }
     88     
     89     public synchronized void stop() throws InterruptedException{
     90         try {
     91             saveUncrawled(exec.shutdownNow());
     92             if (exec.awaitTermination(100, UNIT)){
     93                 saveUncrawled(exec.getCancelledTasks());
     94             }
     95                 
     96         } finally {
     97             exec = null;
     98         }
     99     }    
    100 }

    测试程序:

     1 public class WebCrawler_Main {
     2     
     3     public static void main(String[] args) throws MalformedURLException{
     4         WebCrawler webc = new WebCrawler(new URL("http://site.baidu.com/")) {
     5             
     6             @Override
     7             protected List<URL> processPage(URL url) {
     8                 //获取该url下所有的链接
     9                 //这里省略了该功能
    10                 List<URL> url2 = new ArrayList<URL>();
    11                 try {
    12                     url2.add(new URL("http://www.cnblogs.com/xingele0917/"));
    13                     //url2.add(new URL("http://www.zhihu.com/"));
    14                 } catch (MalformedURLException e) {
    15                     e.printStackTrace();
    16                 }
    17                 return url2;
    18                 
    19             }
    20             
    21         };
    22 
    23         webc.start();
    24         try {
    25             Thread.sleep(10);
    26             webc.stop();
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         }
    30     }
    31 
    32 }

    执行结果:

  • 相关阅读:
    梯度方向问题
    switchsharp
    R语言学习笔记:sort、rank、order、arrange排序函数
    R语言学习笔记:choose、factorial、combn排列组合函数
    MySQL学习笔记:少用Null
    Oracle学习笔记:11g服务介绍及哪些服务必须开启?
    GreenPlum学习笔记:create or replace function创建函数
    Python学习笔记:出生日期转化为年龄
    Python学习笔记:import sys模块(argv、path、platform、exit)
    Oracle学习笔记:wm_concat函数合并字段
  • 原文地址:https://www.cnblogs.com/xingele0917/p/4094822.html
Copyright © 2011-2022 走看看