zoukankan      html  css  js  c++  java
  • 停止基于服务的线程

    停止基于服务的线程

      应用程序通常会创建拥有服务的线程, 比如线程池. 这些服务的存在时间通常要比创建他们的方法存在的时间更长, 如果应用程序优雅的退出了,这些服务的线程也需要结束.因为没有退出线程惯用的优先方法, 他们需要自行结束.

      明智的封装实践指出,你不应该操控某个线程一一中断它,改变他的优先级,等等... 除非是这个.线程的拥有者, 线程API没有关于线程所属权正规的概念. 线程通过一个Thread对象表示,和其他对象一样可以被自由的共享. 但是, 认为线程有一个拥有者是有道理的. 这个拥有者就是创建这个线程的类. 所有线程池拥有它的工作线程, 如果需要中断这些线程. 那么应该由线程池来负责

      例如: ExecutorService 提供的 shutdown 和 shutdownNow 方法.

      对于线程持有的服务, 只要服务的存在时间大于创建线程的方法存在时间,那么就应该提供生命周期方法(lifecycle method)

    实例: 日志服务

    public class LogService {
        private final BlockingQueue<String> queue;
        private final LoggerThread loggerThread;
        private final PrintWriter writer;
        private boolean isShutdown;
        private int reservations;
    
        public LogService(Writer writer) {
            this.queue = new LinkedBlockingQueue<String>();
            this.loggerThread = new LoggerThread();
            this.writer = new PrintWriter(writer);
        }
    
        public void start() {
            loggerThread.start();
        }
    
        public void stop() {
            synchronized (this) {
                isShutdown = true;
            }
            loggerThread.interrupt();
        }
    
        public void log(String msg) throws InterruptedException {
            synchronized (this) {
                if (isShutdown)
                    throw new IllegalStateException(/* ... */);
                ++reservations;
            }
            queue.put(msg);
        }
    
        private class LoggerThread extends Thread {
            public void run() {
                try {
                    while (true) {
                        try {
                            synchronized (LogService.this) {
                                if (isShutdown && reservations == 0)
                                    break;
                            }
                            String msg = queue.take();
                            synchronized (LogService.this) {
                                --reservations;
                            }
                            writer.println(msg);
                        } catch (InterruptedException e) { /* retry */
                        }
                    }
                } finally {
                    writer.close();
                }
            }
        }
    }

    或者使用 ExecutorService 

    public class LogService2 {
    
        private final ExecutorService executorService = Executors
                .newSingleThreadExecutor();
        private final PrintWriter writer;
    
        public LogService2(PrintWriter writer) {
            super();
            this.writer = writer;
        }
    
        public void start() {
        }
    
        public void close() {
            try {
    
                this.executorService.shutdown();
                this.executorService.awaitTermination(TIME_OUT, UNIT);
            } finally {
                if (writer != null)
                    writer.close();
            }
        }
    
        public void log(String str) {
            this.executorService.execute(new Writer(str));
        }
    
    }

    实例: 致命药丸

    public class IndexingService {
        private static final int CAPACITY = 1000;
        private static final File POISON = new File("");
        private final IndexerThread consumer = new IndexerThread();
        private final CrawlerThread producer = new CrawlerThread();
        private final BlockingQueue<File> queue;
        private final FileFilter fileFilter;
        private final File root;
    
        public IndexingService(File root, final FileFilter fileFilter) {
            this.root = root;
            this.queue = new LinkedBlockingQueue<File>(CAPACITY);
            this.fileFilter = new FileFilter() {
                public boolean accept(File f) {
                    return f.isDirectory() || fileFilter.accept(f);
                }
            };
        }
    
        private boolean alreadyIndexed(File f) {
            return false;
        }
    
        class CrawlerThread extends Thread {
            public void run() {
                try {
                    crawl(root);
                } catch (InterruptedException e) { /* fall through */
                } finally {
                    while (true) {
                        try {
                            queue.put(POISON);
                            break;
                        } catch (InterruptedException e1) { /* retry */
                        }
                    }
                }
            }
    
            private void crawl(File root) throws InterruptedException {
                File[] entries = root.listFiles(fileFilter);
                if (entries != null) {
                    for (File entry : entries) {
                        if (entry.isDirectory())
                            crawl(entry);
                        else if (!alreadyIndexed(entry))
                            queue.put(entry);
                    }
                }
            }
        }
    
        class IndexerThread extends Thread {
            public void run() {
                try {
                    while (true) {
                        File file = queue.take();
                        if (file == POISON)
                            break;
                        else
                            indexFile(file);
                    }
                } catch (InterruptedException consumed) {
                }
            }
    
            public void indexFile(File file) {
                /* ... */
            };
        }
    
        public void start() {
            producer.start();
            consumer.start();
        }
    
        public void stop() {
            producer.interrupt();
        }
    
        public void awaitTermination() throws InterruptedException {
            consumer.join();
        }
    
        public static void main(String[] args) {
            IndexingService is =new IndexingService(new File("e://"), new FileFilter() {
    
                public boolean accept(File pathname) {
                    System.out.println(pathname);
                    return true;
                }
            });
            is.start();
            is.stop();
            try {
                is.awaitTermination();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("the end ");
        }
    }

      致命药丸只有在生产线程与消费线程已知的情况下才能使用. IndexingService中的解决方案也可以被扩展到多生产者, 只要让每一个生产线程都往队列里放入一个药丸, 并且消费线程收到第 N(生产者的数量)个药丸时停止. 

    实例: 只执行一次的服务

    public class CheckForMail {
        public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
                throws InterruptedException {
            ExecutorService exec = Executors.newCachedThreadPool();
            final AtomicBoolean hasNewMail = new AtomicBoolean(false);
            try {
                for (final String host : hosts)
                    exec.execute(new Runnable() {
                        public void run() {
                            if (checkMail(host))
                                hasNewMail.set(true);
                        }
                    });
            } finally {
                exec.shutdown();
                exec.awaitTermination(timeout, unit);
            }
            return hasNewMail.get();
        }
    
        private boolean checkMail(String host) {
            // Check for mail
            return false;
        }
    }
  • 相关阅读:
    Delphi的 Format格式化函数
    Delphi的Trim函数
    Delphi容器类之---Tlist,TStringlist,THashedStringlist的效率比较
    Delphi容器类之---TOrderedList、TStack、TQueue、TObjectStack、TObjectQueue
    Delphi容器类之---TList、TObjectList、TComponentList、TClassList
    Delphi的分配及释放---New/Dispose, GetMem/FreeMem及其它函数的区别与相同
    Delphi容器类之---TList、TStringList、TObjectList,以及一个例程的代码分析
    Linux C编程学习6---字符串处理、数据转换
    Linux C编程学习之开发工具3---多文件项目管理、Makefile、一个通用的Makefile
    Linux C编程学习之开发工具2---GDB调试器
  • 原文地址:https://www.cnblogs.com/mjorcen/p/4239858.html
Copyright © 2011-2022 走看看