zoukankan      html  css  js  c++  java
  • 取消与关闭

      要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend等方法提供了这样的机制,但由于存在着一些严重的缺陷,因此应该避免使用)。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

      如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的。取消某个操作的原因很多:

    • 用户请求取消。
    • 有时间限制的操作。
    • 应用程序事件。
    • 错误。
    • 关闭。

      在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

      一个可取消的任务必须拥有取消策略(Cancellation Policy),在这个策略中将详细地定义取消操作的“How”,“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在相应取消时应该执行哪些(What)操作。

      线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

      在Java的API或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

      阻塞库方法,例如Thread.sleep和Obje.wait等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。

      当线程在非阻塞状态时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得“有粘性”——如果不触发InterruptedException,那么中断状态将一直保持,直到明确地清除中断状态。

      调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

      对于中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时候中断自己。

      在使用静态的interrupted时应该小心,因为它会清楚当前线程的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。

      通常,中断是实现取消的最合理的方式。

      正如任务中应该包含取消策略一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

      最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。

      任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。(当你为一户人家打扫房屋时,即使主人不在,也不应该把在这段时间内收到的杂志扔掉,而应该把邮件收起来,等主人回来以后再交给他们处理,尽管你可以阅读他们的杂志)。

      这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

      当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时候。因此需要记住中断请求,并在完成当前任务后抛出InterruptedException或者表示已收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

      当调用可中断的阻塞函数时,例如Thread.sleep或BlockingQueue.put等,有两种策略可用于处理InterruptedException:

    • 传递异常(可能在执行某个特定于任务的清楚操作之后),从而使你的方法也成为可中断的阻塞方法。
    • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

      传递InterruptedException与将InterruptedException添加到throws子句中一样容易。如果不想或者无法传递InterruptedException(或许通过Runnable来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用interrupt来恢复中断状态。

      只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

      对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获InterruptedException时恢复状态,如下所示,如果过早地设置中断状态,就可能引起无限循环。

      

    public Task getNextTask(BlockingQueue<Task> queue){
            boolean interrupted = false;
            try{
                while (true){
                    try{
                        return queue.take();
                    }catch (InterruptedException e){
                        interrupted = true;
                    }
                }
            }finally {
                if (interrupted){
                    Thread.currentThread().interrupt();
                }
            }
        }

       ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)如果mayInterruptIfRunning为true并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。

      除非你清楚线程的中断策略,否则不要中断线程,那么在什么情况下调用cancel可以将参数制定为true?执行任务的线程是由标准的Executor创建的,它实现了一种中断策略使得任务可以通过中断被取消,所以如果任务在标准的Executor中运行,并通过它们的Future来取消任务,那么可以设置mayInterruptIfRunning。当尝试取消某个任务时,不宜直接中断线程池,因为你并不知道当中断请求到达时正在运行什么任务——只能通过任务的Future来实现取消。

      当Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

      在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那么由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。

      当把一个Callable提交给ExecutorService时,submit方法会返回一个Future,我们可以通过这个Future来取消任务。newTaskFor是一个工厂方法,它将创建一个Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口扩展了Future和Runnable(并由FutureTask实现)。

       应用程序通常会创建拥有多个线程的服务,例如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。

      正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等。在线程API中,并没有对线程所有权给出正式的定义:线程由Thread对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。

      与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。

      对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

      

    public class LogService {
        private final BlockingQueue<String> queue;
        private final LoggerThread logger;
        private final PrintWriter writer;
        private boolean isShutdown;
        private int reservations;
    
        public LogService(BlockingQueue<String> queue, LoggerThread logger, PrintWriter writer, boolean shutdown, int reservations) {
            this.queue = queue;
            this.logger = logger;
            this.writer = writer;
            isShutdown = shutdown;
            this.reservations = reservations;
        }
    
        public void start(){
            logger.start();
        }
    
        public void stop(){
            synchronized (this){
                isShutdown = true;
            }
            logger.interrupt();
        }
    
        public void log(String msg ) throws InterruptedException{
            synchronized (this){
                if (isShutdown){
                    throw new IllegalStateException();
                }
                ++reservations;
            }
            queue.put(msg);
        }
    
        private class LoggerThread extends Thread{
            private final PrintWriter writer;
    
            private LoggerThread(PrintWriter writer) {
                this.writer = writer;
            }
    
            @Override
            public void run(){
                try{
                    while (true){
                        try{
                            synchronized (LogService.this){
                                if (isShutdown && reservations == 0){
                                    break;
                                }
                            }
                            String msg = queue.take();
                            synchronized (LogService.this){
                                --reservations;
                            }
                            writer.println(msg);
                        }catch (InterruptedException e){}
                    }
                }finally {
                    writer.close();
                }
            }
        }
    }

      简单的程序可以直接在main函数中启动和关闭全局的ExecutorService。而在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期的方法,它将管理线程的工作委托给一个ExecutorService,而不是由其自行管理。通过封装ExecutorService,可以将所有权链从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

      另一种关闭生产者-消费者服务的方式就是使用“毒丸(Position Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止”。在FIFO队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。

    public class IndexingService {
        private static final File POISON = new File("");
        private final IndexerThread consumer = new IndexerThread();
        private final CrawlerThread producer  = new CrawlerThread();
        private final BlockingQueue<File> queue;
        private final FileFilter fileFilter;
        private final File root;
    
        public IndexingService(BlockingQueue<File> queue, FileFilter fileFilter, File root) {
            this.queue = queue;
            this.fileFilter = fileFilter;
            this.root = root;
        }
        
        public void start(){
            producer.start();
            consumer.start();
        }
        
        public void stop(){
            producer.interrupt();
        }
        
        public void awaitTermiantion() throws InterruptedException{
            consumer.join();
        }
    
        class  IndexerThread extends Thread{
            
            public void run(){
                try{
                    while (true){
                        File file = queue.take();
                        if (file == POISON){
                            break;
                        }else {
                            indexFile(file);
                        }
                    }
                }catch (InterruptedException e){
    
                }
            }
            private void indexFile(File file){
    
            }
        }
    
        class CrawlerThread extends  Thread{
            public void run(){
                try{
                    crawl(root);
                }catch (InterruptedException e){
    
                }finally {
                    while (true){
                        try{
                            queue.put(POISON);
                            break;
                        }catch (InterruptedException e){
    
                        }
                    }
                }
            }
    
            private void crawl(File root) throws InterruptedException{
    
            }
        }
    }

      只有在生产者和消费者数量都已知的情况下,才可以使用“毒丸”对象。在IndexingService中采用的解决方案可以扩展到多个生产者:只需要每个生产者都向队列中放入一个“毒丸”对象,并且消费这仅当在接收到Nproduces个“毒丸”对象时才停止。这种方法也可以扩展到多个消费者的情况,只需要生产者将Nconsumers个“毒丸”对象放入队列。然而,当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,“毒丸”对象才能可靠的工作。

       如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。

    boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit) throws InterruptedException{
            ExecutorService exec = Executors.newCachedThreadPool();
            final AtomicBoolean hasNewMail = new AtomicBoolean(false);
            try{
                for (final String host:hosts){
                    exec.execute(new Runnable() {
                        @Override
                        public void run() {
                            if (checkMail(host)){
                                hasNewMail.set(true);
                            }
                        }
                    });
                }
            }finally {
                exec.shutdown();
                exec.awaitTermination(timeout,unit);
            }
            return hasNewMail.get();
        }

      当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。

      然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当Executor关闭时哪些任务正在执行。

    public abstract class WebCrawler {
        private volatile TrackingExecutor exec;
        private final Set<URL> usrlsToCrawl = new HashSet<URL>();
        
        public synchronized void start(){
            exec = new TrackingExecutor(Executors.newCachedThreadPool());
            for (URL url:usrlsToCrawl){
                submitCrawlTask(url);
            }
            usrlsToCrawl.clear();
        }
        
        private synchronized void stop() throws InterruptedException{
            try {
                saveUncrawled(exec.shutdownNow());
                if (exec.awaitTermination(TIMEOUT, TimeUnit.DAYS)){
                    saveUncrawled(exec.getCancelledTask());
                }
            }finally {
                exec = null;
            }
        }
        
        protected abstract List<URL> processPage(URL url);
        
        private void submitCrawlTask(URL u){
            exec.execute(new CrawlTask(u));
        }
        
        private void saveUncrawled(List<Runnable> uncrawled){
            for (Runnable task:uncrawled){
                usrlsToCrawl.add(((CrawlTask)task).getPage());
            }
        }
        
        private class CrawlTask implements Runnable{
            private final URL url;
    
            private CrawlTask(URL url) {
                this.url = url;
            }
    
            public void run(){
                for (URL link:processPage(url){
                    if (Thread.currentThread().isInterrupted()){
                        return;
                    }
                    submitCrawlTask(url);
                }
            }
    
            private URL getPage() {
                return url;
            }
        }
    }

      

      

  • 相关阅读:
    Java Web 网络留言板2 JDBC数据源 (连接池技术)
    Java Web 网络留言板3 CommonsDbUtils
    Java Web ConnectionPool (连接池技术)
    Java Web 网络留言板
    Java Web JDBC数据源
    Java Web CommonsUtils (数据库连接方法)
    Servlet 起源
    Hibernate EntityManager
    Hibernate Annotation (Hibernate 注解)
    wpf控件设计时支持(1)
  • 原文地址:https://www.cnblogs.com/lsf90/p/3750629.html
Copyright © 2011-2022 走看看