zoukankan      html  css  js  c++  java
  • java线程池的拒绝策略

    转载: java线程池的拒绝策略

    一、为什么要自定义线程池

    阿里规范中对于线程、线程池的规定

    《阿里巴巴 Java开发手册》1.6并发处理

    第3条规定:线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

    第4条规定:线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能让编写代码的攻城狮更加明确线程池的运行规则,规避资源耗尽(OOM)的风险

    之所以会出现这样的规范,是因为jdk已经封装好的线程池存在潜在风险:

    • FixedThreadPool 和 SingleThreadPool:
      允许的请求队列长度为 Integer.MAX_VALUE ,会堆积大量请求OOM

    • CachedThreadPool 和 ScheduledThreadPool:
      允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量线程OOM

    所以从系统安全角度出发,原则上都应该自己手动创建线程池

    二、如何自定义线程池

    ThreadPoolExecutor 有多个重载的构造函数。这里使用参数最多的一个简要说明自定义线程池的关键参数。

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 

    其实自定义线程池很简便,就这么几个规则

    • 线程池的线程数量长期维持在 corePoolSize 个(核心线程数量)
    • 线程池的线程数量最大可以扩展到 maximumPoolSize 个
    • 在 corePoolSize ~ maximumPoolSize 这个区间的线程,一旦空闲超过keepAliveTime时间,就会被杀掉(时间单位)
    • 送来工作的线程数量超过最大数以后,送到 workQueue 里面待业
    • 待业队伍也满了,就按照事先约定的策略 RejectedExecutionHandler 给拒绝掉

    以下详细解析拒绝策略

    三、线程池的拒绝策略

    3-0、所有拒绝策略都实现了接口 RejectedExecutionHandler

    public interface RejectedExecutionHandler {
        /**
         * @param r the runnable task requested to be executed
         * @param executor the executor attempting to execute this task
         * @throws RejectedExecutionException if there is no remedy
         */
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }

    这个接口只有一个 rejectedExecution 方法。

    r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。

    3-1、AbortPolicy

    直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐

        public static class AbortPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
            }
        }

    3-2、CallerRunsPolicy

    在调用者线程中(也就是说谁把 r 这个任务甩来的),运行当前被丢弃的任务。

    只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。

    如果线程池已经被关闭,则直接丢弃该任务。

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }

    这里有个小问题:r.run() 是如何做到使用调用者所在线程来运行任务的?

    参看:Thread的.start()与.run()的区别

    3-3、DiscardOledestPolicy

    丢弃队列中最老的,然后再次尝试提交新任务。

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

    这里 e.getQueue() 是获得待执行的任务队列,也就是前面提到的待业队列。

    因为是队列,所以先进先出,一个poll()方法就能直接把队列中最老的抛弃掉,再次尝试执行execute(r)。

    这个队列在线程池定义的时候就能看到,是一个阻塞队列

        /**
         * The queue used for holding tasks and handing off to worker
         * threads.  We do not require that workQueue.
         */     
        private final BlockingQueue<Runnable> workQueue;
    
        public BlockingQueue<Runnable> getQueue() {
            return workQueue;
        }

    3-4、DiscardPolicy

    默默丢弃无法加载的任务。

    这个代码就很简单了,真的是啥也没做

        public static class DiscardPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

    3-5、通过实现 RejectedExecutionHandler 接口扩展

    jdk内置的四种拒绝策略(都在ThreadPoolExecutor.java里面)代码都很简洁易懂。

    我们只要继承接口都可以根据自己需要自定义拒绝策略。下面看两个例子。

    一是netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。

        private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    final Thread t = new Thread(r, "Temporary task executor");
                    t.start();
                } catch (Throwable e) {
                    throw new RejectedExecutionException("Failed to start a new thread", e);
                }
            }
        }

    另外一个是dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件

    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String msg = String.format("Thread pool is EXHAUSTED!" +
                            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                    url.getProtocol(), url.getIp(), url.getPort());
            logger.warn(msg);
            dumpJStack();
            throw new RejectedExecutionException(msg);
        }
    }
  • 相关阅读:
    OpenStack--Rabbitmq组件消息队列
    Redis-主从
    haproxy mycat mysql 读写分离MHA高可用
    mysql小白系列_14 线上故障分析与排错
    mysql小白系列_13 Online DDL
    mysql小白系列_12 sysbench
    mysql小白系列_11 MHA补充
    mysql小白系列_11 MHA
    mysql小白系列_10 mysql主从复制原理
    mysql小白系列_09 mysql性能优化关键点
  • 原文地址:https://www.cnblogs.com/brithToSpring/p/13322287.html
Copyright © 2011-2022 走看看