zoukankan      html  css  js  c++  java
  • 线程池使用拒绝策略时需要注意的坑

    转载自:
    http://ifeve.com/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%BD%BF%E7%94%A8futuretask%E6%97%B6%E5%80%99%E9%9C%80%E8%A6%81%E6%B3%A8%E6%84%8F%E7%9A%84%E4%B8%80%E7%82%B9%E4%BA%8B/

      线程池使用FutureTask的时候如果拒绝策略设置为了DiscardPolicy和DiscardOldestPolicy并且在被拒绝的任务的Future对象上调用无参get方法那么调用线程会一直被阻塞。

    问题复现

        public static void main(String[] args) throws Exception {
    
            //(1)线程池单个线程,线程池队列元素个数为1
            ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
                    1L, TimeUnit.MINUTES,
                    new ArrayBlockingQueue<>(1),
                    new ThreadPoolExecutor.DiscardPolicy());
    
            //(2)添加任务one
            Future futureOne = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable one");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            //(3)添加任务two
            Future futureTwo = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable two");
                }
            });
    
            //(4)添加任务three
            Future futureThree = null;
            try {
                futureThree = executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("start runable three");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(e.getLocalizedMessage());
            }
    
            System.out.println("task one finish " + futureOne.get());//(5)等待任务one执行完毕
            System.out.println("task two finish " + futureTwo.get());//(6)等待任务two执行完毕
            System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任务three执行完毕
    
            executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
        }

      运行结果:

    start runable one
    task one finish null
    start runable two
    task two finish null

      创建了一个单线程并且队列元素个数为1的线程池,并且拒绝策略设置为了DiscardPolicy;先提交任务one,这个任务会使用唯一的一个线程进行执行,任务在打印 start runable one后会阻塞该线程5s;再向线程池提交了一个任务two,这时候会把任务two放入到阻塞队列;提交任务three时,由于队列已经满了则会触发拒绝策略丢弃任务three。
      从运行结果看在任务one阻塞的5s内,主线程执行到了代码(5)等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出task one finish null。之后线程池的唯一线程会去队列里面取出任务two并执行所以输出start runable two然后代码(6)会返回,这时候主线程输出task two finish null,然后执行代码(7)等待任务three执行完毕,从执行结果看代码(7)会一直阻塞不会返回。
      至此问题产生,如果把拒绝策略修改为DiscardOldestPolicy也会存在有一个任务的get方法一直阻塞只是现在是任务two被阻塞:

    start runable one
    task one finish null
    start runable three

      但是如果拒绝策略设置为默认的AbortPolicy则会抛出RejectedExecutionException并正常返回。

    问题分析

      要分析这个问题需要看下线程池的submit方法里面做了什么,submit方法代码如下:

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }

      newTaskFor()把Runnable转为FutureTask对象,FutureTask实现RunnableFuture接口,继续跟execute:

    public void execute(Runnable command) {
            // ...
            // 如果线程个数消息核心线程数则新增处理线程处理
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 如果当前线程个数已经达到核心线程数则任务放入队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 尝试新增处理线程进行处理
            else if (!addWorker(command, false))
                reject(command);// 新增失败则调用拒绝策略
        }
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }

      再来看下拒绝策略DiscardPolicy的代码:

        public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

      这里rejectedExecution方法里面什么都没做,所以代码(4)调用submit后会返回一个future对象,即FutureTask:

    public class FutureTask<V> implements RunnableFuture<V> {
        /**
         * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
         */
        private volatile int state;
        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;

      state标识FutureTask的状态,初始状态是New。因此使用DiscardPolicy策略提交后返回了一个状态为NEW的FutureTask对象。
      那么下面就需要看下当调用future的无参get方法时候当future变为什么状态时候才会返回:

        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)  // 状态值为NORMAL正常返回
                return (V)x;
            if (s >= CANCELLED) // 状态值大于等于CANCELLED则抛异常
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }

      也就是说当future的状态>COMPLETING时候调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素的时候并没有设置该future的状态,后面也没有其他机会可以设置该future的状态,所以future的状态一直是NEW,所以一直不会返回,同理DiscardOldestPolicy策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于future的状态,也会导致一直不会返回。
      那么默认的AbortPolicy策略为啥没问题那?来看AbortPolicy策略代码:

        public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }

      看代码就应该明白了吧。

      所以当使用Future的时候,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略在执行策略时候设置该Future的状态大于COMPLETING即可,但是查看FutureTask提供的方法发现只有cancel方法是public的并且可以设置FutureTask的状态大于COMPLETING,重写拒绝策略具体代码可以如下:

        public static void main(String[] args) throws Exception {
            //(1)线程池单个线程,线程池队列元素个数为1
            ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
                    1L, TimeUnit.MINUTES,
                    new ArrayBlockingQueue<>(1),
                    new ThreadPoolExecutor.DiscardPolicy() {
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                            if (!e.isShutdown()) {
                                if (r != null && r instanceof FutureTask) {
                                    ((FutureTask) r).cancel(true);
                                }
                            }
                        }
                    }
            );
    
            //(2)添加任务one
            Future futureOne = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable one");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            //(3)添加任务two
            Future futureTwo = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable two");
                }
            });
    
            //(4)添加任务three
            Future futureThree = null;
            try {
                futureThree = executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("start runable three");
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(e.getLocalizedMessage());
            }
    
            try {
                System.out.println("task one finish " + futureOne.get());//(5)等待任务one执行完毕
                System.out.println("task two finish " + futureTwo.get());//(6)等待任务two执行完毕
                System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任务three执行完毕
            } catch (Exception e) {
                e.printStackTrace();
            }
            executorService.shutdown();//(8)关闭线程池,阻塞直到所有任务执行完毕
        }

      使用这个策略时候,由于report方法中对cancel的任务上会抛出CancellationException异常,所以在get()时使用try-catch捕获异常。运行后发现程序能正常退出。

  • 相关阅读:
    SQL注入: with rollup特性
    【转】kali配置--修改IP和DNS
    【转】getopt模块,实现获取命令行参数
    socket编程: TypeError: must be bytes or buffer, not str
    Ansible进阶之企业级应用
    Ansible之Playbook详解
    Ansible之常用模块介绍
    JAVA企业级应用Tomcat实战
    ubuntu网络、包管理、工作内容小结
    shell细节决定高度
  • 原文地址:https://www.cnblogs.com/jpfss/p/11671134.html
Copyright © 2011-2022 走看看