zoukankan      html  css  js  c++  java
  • Java并发编程实践读书笔记(4)任务取消和关闭

    任务的取消

    中断传递原理

    Java中没有抢占式中断,就是武力让线程直接中断。

    Java中的中断可以理解为就是一种简单的消息机制。某个线程可以向其他线程发送消息,告诉你“你应该中断了”。收到这条消息的线程可以根据这个消息做出反应。

    意思是,不是你说让我停我就会停,我愿意停就停!

    中断消息的传递其实就是通过Thread的一个布尔类型中断状态变量实现的。

    发送中断请求时,线程的中断变量被设置为true:

    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
           //... 
        }        
    });
    t.start();
    //...        
    t.interrupt();

    任务线程中监听这个状态变量就知道是否有中断请求:

    @Override
    public void run() {
         while(!Thread.currentThread().isInterrupted()) {
                        
         }         
    }

    或:

    @Override
    public void run() {
         while(!Thread.interrupted()) {
                        
         }         
    }

    注意:阻塞方法一般都会抛出InterruptedException,这个异常一旦被try-catch到了,Thread的interrupt状态就会被自动清除(设置为false)。如果不得不try-catch(比如在Runnable的run方法中,或其他类似接口方法中),但是又不知道应该怎么处理中断异常时,一定要记得恢复中断状态:

    Thread.currentThread().interrupt();//重新标记为已中断

    这样做的目的,是为了其他能够真的处理这个中断的地方能收到中断请求!

    反过来说就是,把InterruptedException异常try-cath到了,但是又什么都不做,就像是拦截了别人的信件,自己看完之后扔垃圾桶了。这种行为破坏了导致中断机制。

     Future的cancel

    Future的cancel包含两层意思,一个是如果任务还没启动那么就别启动了,另外一个意思是启动了就中断。

            try {
                future.get(5, TimeUnit.SECONDS);
            } catch (ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }finally {
                future.cancel(true);
            }

    处理不可中断的任务

    JDK中很多阻塞方法都会考虑到中断,对于不可中断的任务,比如Socket连接,我们能做的是改写Thread的interrupt,来手动做一些事情让任务“中断”掉。

    public class ReaderThread extends Thread{
        
        private final Socket socket;
        private final InputStream in;
    
        public ReaderThread(Socket socket) throws IOException {
            super();
            this.socket = socket;
            this.in = socket.getInputStream();
        }
    
    
        @Override
        public void interrupt() {
            try {
                //关闭socket,与后面的try-catch配合使用
                this.socket.close();
            }catch(Exception e) {
            }finally{
                super.interrupt();
            }
        }
    
    
        @Override
        public void run() {
            byte[] buf = new byte[1024];
            try {
                while(true) {
                    int count = in.read(buf);//依赖socket.close异常跳出while
                    if(count<0) {
                        break;
                    }else if(count>0) {
                        processBuffer(buf,count);
                    }
                }
            }catch(Exception e) {
                
            }
        }
    
    
        private void processBuffer(byte[] buf, int count) {
            
        }
        
    }

    由此应该知道,在考虑中断的功能时,可以依赖Java的中断消息机制,但更多的是要根据实际的业务需求来合理利用中断机制。

    线程服务的取消

     如果我们对外提供了线程服务,那么就应该提供生命周期的方法,比如shutdown、cancel等。

    基于前面提到的知识,可以用中断机制来关闭任务线程。但更重要的是线程的关闭不能影响业务。

    比如WebLog应用,WebLog使用了一个阻塞队列来缓存WebLog日志,这样对个线程就能同时打日志了,另外有日志处理线程专门负责从队列中取日志来进行记录。这样一个简单的应用在取消的时候就不能盲目的“kill”掉,需要考虑:

    1,关闭后已经在阻塞队列中的日志还是应该继续处理完;

    2,不能继续接受新的日志存入到队列中了

    /**
     * @author huqiao
     */
    public class LogService {
        
        private final BlockingQueue<String> queue;
        private final LoggerThread loggerThread;
        private final PrintWriter writer;
        private boolean isStoped = false;
        private int logCount = 0;
        
        public LogService() {
            queue = new ArrayBlockingQueue<>(10);
            loggerThread = new LoggerThread();
            writer = new PrintWriter(System.out);
        }
        public void start() {
            loggerThread.start();
        }
        
        public void stop() {
            isStoped = true;
            loggerThread.interrupt();//实现queue.take()抛异常的效果,促使任务线程赶紧去查看“isStoped”状态
        }
        public void log(String log) throws InterruptedException {
            synchronized (this) {
                if(isStoped) {
                    //拒绝新日志写入
                    throw new IllegalStateException(" logger is stoped");
                }
                logCount++;
            }
            queue.put(log);
        }
        
        private class LoggerThread extends Thread{
    
            @Override
            public void run() {
                try {
                    while(true) {
                        try {
                            String log = null;
                            synchronized (LogService.this) {
                                if(isStoped && logCount == 0) {//logCount确保在stop之后还有机会处理接下来的日志
                                    break;
                                }
                            }
                            log = queue.take();
                            synchronized (LogService.this) {
                                logCount--;
                            }
                            writer.write(log);
                        } catch (InterruptedException e) {
                            //忽略异常(try-catch到之后,中断状态已经被标记为false了,再take时就不会出错了),因为已经自己实现了中断机制
                        }
                    }
                }finally {
                    writer.close();
                }
            }
        }
    
    }

    ExecutorService的关闭

    shutdown:仅仅表示ExecutorService不再接受新的提交请求,它不会给子任务发中断请求;

    shutdownNow:会尝试给任务线程发送中断请求,但如果子线程没有实现中断请求,那发了也没用,子线程照样一直跑;

    无论是shutdown还是shutdownNow,如果线程任务没有实现中断,那就根本不会停止。showdownNow会多返回一个List,里面存着还没执行的任务。所以在实现ExecutorService的关闭时,一定要有下面任务线程的配合。

    public class ExecutorShutdownTest {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            for(int i= 0;i<1;i++) {
                executorService.execute(new Task(i+""));
            }
            
            List<Runnable> taskList = executorService.shutdownNow();
            boolean successToTerminated = executorService.awaitTermination(10, TimeUnit.SECONDS);//这里等待10秒,能正常等待到任务线程退出
            System.out.println("successToTerminated=" + successToTerminated);
            for(Runnable runnable : taskList) {
                Task task = (Task)runnable;
                System.out.println(task.getName());
            }
        }
        
        private static class Task implements Runnable{
    
            private String name;
            public String getName() {
                return name;
            }
            public Task(String name) {
                this.name = name;
            }
            @Override
            public void run() {
                int i = 0;
                while(true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println("task " + name + " is running...");
                        if(++i>3) {//3秒之后正常退出
                            break;
                        }
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                    }
                }
            }
    
        }
        
    }

    毒丸对象

    书中介绍的这个毒丸对象是一种关闭子线程的巧妙办法。还是以前面的日志服务为例,客户代码想要子任务代码不要在继续跑了,那么可以给子任务扔一个“特殊的消息”。然后子任务会识别这个特殊的消息,让自己退出运行。

    处理非正常的线程终止

    线程服务中的多个线程有可能异常终止,我们应该要能即使的知道这样的事情。

    在创建ExecutorService的时候可以指定线程的创建工程(ThreadFactory):

    public class UnCatchExceptionHandlerTest {
        
        public static void main(String[] args) throws InterruptedException {
            ExecutorService service = Executors.newFixedThreadPool(10,new MyThreadFactory());
            service.execute(new Runnable() {
                @Override
                public void run() {
                    int i = 0;
                    System.out.println(100 / i);
                }
            });
        }
    
    }

    在这个线程工程中,可以为thread设置UncatchExceptionHandler:

    public class MyThreadFactory implements ThreadFactory{
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(new MyUncatchExceptionHandler());
            return t;
        }
    
    }
    public class MyUncatchExceptionHandler implements UncaughtExceptionHandler{
    
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println(t.getName() + " occur exception:" + e.getMessage());
        }
    
    }

    注意,这个方法只对execute提交的任务有效,用submit提交的任务,其异常还是会被Future.get()封装在ExecutionException中重新抛出。

     

  • 相关阅读:
    Unix网络编程中的五种IO模型
    go工具库分析——go-snowflake
    defer
    滑动窗口
    快速幂
    Golang使用注意点
    微服务数据一致性基础认知
    KMP算法
    单调栈和单调队列
    LRU简单学习
  • 原文地址:https://www.cnblogs.com/at0x7c00/p/8166421.html
Copyright © 2011-2022 走看看