zoukankan      html  css  js  c++  java
  • 线程池面试题

    为什么要使用线程池?

    创建线程和销毁线程的花销是比较大的,这些时间有可能比处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务工作线程,消耗系统资源的时间,可能导致系统资源不足。(我们可以把创建和销毁的线程的过程去掉)

    线程池有什么作用?

    1、提高效率 创建好一定数量的线程放在池中,等需要使用的时候就从池中拿一个,这要比需要的时候创建一个线程对象要快的多。
    2、方便管理 可以编写线程池管理代码对池中的线程同一进行管理,比如说启动时有该程序创建100个线程,每当有请求的时候,就分配一个线程去工作,如果刚好并发有101个请求,那多出的这一个请求可以排队等候,避免因无休止的创建线程导致系统崩溃。

    说说几种常见的线程池及使用场景

    1、newSingleThreadExecutor
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    2、newFixedThreadPool
    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    3、newCachedThreadPool
    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    4、newScheduledThreadPool
    创建一个定长线程池,支持定时及周期性任务执行。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    

    线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
    1)newFixedThreadPool和newSingleThreadExecutor:
      主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
    2)newCachedThreadPool和newScheduledThreadPool:
      主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
    Positive example 1:

     //org.apache.commons.lang3.concurrent.BasicThreadFactory
     ScheduledExecutorService executorService =
                    new ScheduledThreadPoolExecutor(1, 
                            new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()
                    );
    

    Positive example 2:

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                    .setNameFormat("demo-pool-%d").build();
    
            //Common Thread Pool
            ExecutorService pool = new ThreadPoolExecutor(5, 200,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    
            pool.execute(()-> System.out.println(Thread.currentThread().getName()));
            pool.shutdown();//gracefully shutdown
    

    Positive example 3:

     <bean id="userThreadPool"
            class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="100" />
            <property name="queueCapacity" value="2000" />
    
        <property name="threadFactory" value= threadFactory />
            <property name="rejectedExecutionHandler">
                <ref local="rejectedExecutionHandler" />
            </property>
        </bean>
        //in code
        userThreadPool.execute(thread);
    

    个人在项目中用到的是第三种,业务需求,每天会有调度服务器会通过http协议请求

    <bean id="xxDataThreadPool"
              class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <!-- 核心线程数 -->
            <property name="corePoolSize" value="50"/>
            <!-- 最大线程数 -->
            <property name="maxPoolSize" value="500"/>
            <!-- 队列最大长度 >=mainExecutor.maxSize -->
            <property name="queueCapacity" value="10"/>
            <!-- 线程池维护线程所允许的空闲时间 -->
            <property name="keepAliveSeconds" value="1"/>
            <!-- 线程池对拒绝任务(无线程可用)的处理策略 如果已经超过了限制丢弃消息,不进行处理 -->
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor$DiscardPolicy"/>
            </property>
        </bean>
    
    @Controller
    @RequestMapping("/windData")
    public class WindDataListener {
    
        private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
    
        @Autowired
        private ThreadPoolTaskExecutor controlerThreadPool;
    
        @Autowired
        private ThreadPoolTaskExecutor windDataThreadPool;
    
        @Autowired
        private WindDataRuntimeService runtimeService;
    
        @Autowired
        private MaintainAlarmSender maintainAlarmSender;
    
    
        /**
         * 启动调度
         */
        @RequestMapping(value = "/receiveMsg", method = RequestMethod.GET)
        @ResponseBody
        public void receiveMsg() {
    
            final String paramLog = LogConst.BUSSINESS_NAME + LogConst.HTTP_API;
            logger.info("[{}][接收到调度消息]", paramLog);
    
            //定时调度,可能有多个http请求,把请求都放在controlerThreadPool里面
            controlerThreadPool.execute(new WindDataDispatcher(windDataThreadPool, runtimeService, maintainAlarmSender,
                    MDC.getCopyOfContextMap()));
    
            logger.info("[{}][响应给调度系统]", paramLog);
        }
    }
    
    public class WindDataDispatcher implements Runnable {
    
        private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
    
        private ThreadPoolTaskExecutor taskThreadPool;
    
        private WindDataRuntimeService runtimeService;
    
        private MaintainAlarmSender maintainAlarmSender;
    
        private Map<Object, Object> mdcMap;
    
        public WindDataDispatcher(ThreadPoolTaskExecutor taskThreadPool, WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map<Object, Object> mdcMap) {
            this.taskThreadPool = taskThreadPool;
            this.runtimeService = runtimeService;
            this.maintainAlarmSender = maintainAlarmSender;
            this.mdcMap = mdcMap;
        }
    
    
        @Override
        public void run() {
            if (null != mdcMap) {
                MDC.setContextMap(mdcMap);
            }
            final String paramLog = LogConst.BUSSINESS_NAME + LogConst.DISPATCHER;
            logger.info("[{}启动]", paramLog);
            taskThreadPool.execute(new WindDataExecutor(runtimeService, maintainAlarmSender, mdcMap));
            logger.info("[{}结束]", paramLog);
        }
    }
    
    public class WindDataExecutor implements Runnable {
    
        private final static ThLogger logger = ThLoggerFactory.getLogger("WindDataDispatcher");
    
        private WindDataRuntimeService runtimeService;
    
        private MaintainAlarmSender maintainAlarmSender;
    
        private Map<Object, Object> mdcMap;
    
        public WindDataExecutor(WindDataRuntimeService runtimeService, MaintainAlarmSender maintainAlarmSender, Map<Object, Object> mdcMap) {
            this.runtimeService = runtimeService;
            this.maintainAlarmSender = maintainAlarmSender;
            this.mdcMap = mdcMap;
        }
    
        @Override
        public void run() {
            if (null != mdcMap) {
                MDC.setContextMap(mdcMap);
            }
            final String paramLog = LogConst.BUSSINESS_NAME + LogConst.EXECUTOR;
            logger.info("[{}启动]", paramLog);
            try {
                runtimeService.groundWindData();
            } catch (Exception e) {
                logger.error("[{}异常]{}", new Object[]{paramLog, e});
                maintainAlarmSender.sendMail(MaintainAlarmSender.DEFAULT_MAIL_SUB, paramLog + "异常:" + e);
            }
            logger.info("[{}结束]", paramLog);
        }
    
    }
    

    线程池都有哪几种工作队列

    1、ArrayBlockingQueue
    是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    2、LinkedBlockingQueue
    一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    3、SynchronousQueue
    一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    4、PriorityBlockingQueue
    一个具有优先级的无限阻塞队列

    线程池中的几种重要的参数及流程说明

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue
    LinkedBlockingQueue
    SynchronousQueue
    PriorityBlockingQueue
    

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。

    threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

    handler:表示当拒绝处理任务时的策略,有以下四种取值:

    1、AbortPolicy:直接抛出异常。
    2、CallerRunsPolicy:只用调用者所在线程来运行任务。
    3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    4、DiscardPolicy:不处理,丢弃掉。
    5、也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
       /**
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
        /**
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
        /**
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
        /**
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    

    ThreadPoolExecutor 源码理解 https://www.cnblogs.com/dolphin0520/p/3932921.html

     public static void test(int size) {
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 20, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
    
            for (int i = 0; i < size; i++) {
                poolExecutor.execute(new DemoTask(i));
    
    
                Console.log("poolSize:" + poolExecutor.getPoolSize());
                Console.log("corePoolSize:" + poolExecutor.getCorePoolSize());
                Console.log("maximumPoolSize:" + poolExecutor.getMaximumPoolSize());
                Console.log("queue:" + poolExecutor.getQueue().size());
                Console.log("completedTaskCount:" + poolExecutor.getCompletedTaskCount());
                Console.log("largestPoolSize:" + poolExecutor.getLargestPoolSize());
                Console.log("keepAliveTime:" + poolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
    
            }
    
            poolExecutor.shutdown();
        }
    
    class DemoTask implements Runnable {
    
        private int taskNum;
    
        public DemoTask(int taskNum) {
            this.taskNum = taskNum;
        }
    
        @Override
        public void run() {
            Console.log(StringUtils.center("正在执行" + taskNum, 20, "="));
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Console.log(StringUtils.center("执行完毕" + taskNum, 20, "="));
        }
    }
    
    =======正在执行0========
    poolSize:1
    corePoolSize:5
    maximumPoolSize:20
    queue:0
    completedTaskCount:0
    largestPoolSize:1
    keepAliveTime:2
    poolSize:2
    corePoolSize:5
    maximumPoolSize:20
    queue:0
    completedTaskCount:0
    =======正在执行1========
    largestPoolSize:2
    keepAliveTime:2
    poolSize:3
    corePoolSize:5
    maximumPoolSize:20
    =======正在执行2========
    queue:0
    completedTaskCount:0
    largestPoolSize:3
    keepAliveTime:2
    poolSize:4
    corePoolSize:5
    maximumPoolSize:20
    queue:0
    =======正在执行3========
    completedTaskCount:0
    largestPoolSize:4
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    =======正在执行4========
    maximumPoolSize:20
    queue:0
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    maximumPoolSize:20
    queue:1
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    maximumPoolSize:20
    queue:2
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    maximumPoolSize:20
    queue:3
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    maximumPoolSize:20
    queue:4
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:5
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    largestPoolSize:5
    keepAliveTime:2
    poolSize:6
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    largestPoolSize:6
    keepAliveTime:2
    poolSize:7
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    largestPoolSize:7
    keepAliveTime:2
    =======正在执行11=======
    poolSize:8
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    =======正在执行12=======
    =======正在执行10=======
    largestPoolSize:8
    keepAliveTime:2
    poolSize:9
    corePoolSize:5
    =======正在执行13=======
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    largestPoolSize:9
    keepAliveTime:2
    poolSize:10
    corePoolSize:5
    maximumPoolSize:20
    =======正在执行14=======
    queue:5
    completedTaskCount:0
    largestPoolSize:10
    keepAliveTime:2
    poolSize:11
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    =======正在执行15=======
    completedTaskCount:0
    largestPoolSize:11
    keepAliveTime:2
    poolSize:12
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    completedTaskCount:0
    =======正在执行16=======
    largestPoolSize:12
    keepAliveTime:2
    poolSize:13
    corePoolSize:5
    maximumPoolSize:20
    =======正在执行17=======
    queue:5
    completedTaskCount:0
    largestPoolSize:13
    keepAliveTime:2
    poolSize:14
    corePoolSize:5
    maximumPoolSize:20
    queue:5
    =======正在执行18=======
    completedTaskCount:0
    largestPoolSize:14
    keepAliveTime:2
    poolSize:15
    corePoolSize:5
    maximumPoolSize:20
    =======正在执行19=======
    queue:5
    completedTaskCount:0
    largestPoolSize:15
    keepAliveTime:2
    =======执行完毕0========
    =======正在执行5========
    =======执行完毕1========
    =======执行完毕2========
    =======正在执行6========
    =======正在执行7========
    =======执行完毕4========
    =======正在执行8========
    =======执行完毕3========
    =======正在执行9========
    =======执行完毕13=======
    =======执行完毕12=======
    =======执行完毕10=======
    =======执行完毕11=======
    =======执行完毕15=======
    =======执行完毕16=======
    =======执行完毕14=======
    =======执行完毕19=======
    =======执行完毕18=======
    =======执行完毕17=======
    =======执行完毕5========
    =======执行完毕7========
    =======执行完毕6========
    =======执行完毕8========
    =======执行完毕9========
    

    怎么理解无界队列和有界队列

    有界队列
    1.初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行 。
    2.当提交的任务数超过了corePoolSize,会将当前的runable提交到一个block queue中。
    3.有界队列满了之后,如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务。
    4.如果3中也无法处理了,就会走到第四步执行reject操作。
    无界队列
    与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。

    如何合理配置线程池的大小

    https://www.zhihu.com/question/38128980

  • 相关阅读:
    ::selection
    为什么要web语义化
    label的for属性与inputde的id元素绑定
    a 标签
    IE6 双倍距BUG
    css样式书写顺序
    清除浮动 .clearfix
    Unity利用AnimationCurve做物体的各种运动
    Error: unknown argument: '-websockets'
    关闭VAX的拼写检查_解决中文红色警告问题
  • 原文地址:https://www.cnblogs.com/Treesir/p/11763399.html
Copyright © 2011-2022 走看看