zoukankan      html  css  js  c++  java
  • Java多线程-线程池ThreadPoolExecutor构造方法和规则

    为什么用线程池

    博客地址 http://blog.csdn.net/qq_25806863

    原文地址 http://blog.csdn.net/qq_25806863/article/details/71126867

    有时候,系统需要处理非常多的执行时间很短的请求,如果每一个请求都开启一个新线程的话,系统就要不断的进行线程的创建和销毁,有时花在创建和销毁线程上的时间会比线程真正执行的时间还长。而且当线程数量太多时,系统不一定能受得了。

    使用线程池主要为了解决一下几个问题:

    • 通过重用线程池中的线程,来减少每个线程创建和销毁的性能开销。
    • 对线程进行一些维护和管理,比如定时开始,周期执行,并发数控制等等。

    Executor

    Executor是一个接口,跟线程池有关的基本都要跟他打交道。下面是常用的ThreadPoolExecutor的关系。

    这里写图片描述

    Executor接口很简单,只有一个execute方法。

    ExecutorService是Executor的子接口,增加了一些常用的对线程的控制方法,之后使用线程池主要也是使用这些方法。

    AbstractExecutorService是一个抽象类。ThreadPoolExecutor就是实现了这个类。

    ThreadPoolExecutor

    构造方法

    ThreadPoolExecutor是线程池的真正实现,他通过构造方法的一系列参数,来构成不同配置的线程池。常用的构造方法有下面四个:

    这里写图片描述

    • ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 
      • 1
      • 2
      • 3
      • 4
      • 5
    • ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    构造方法参数说明

    • corePoolSize

      核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true

    • maximumPoolSize

      线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。

    • keepAliveTime

      非核心线程的闲置超时时间,超过这个时间就会被回收。

    • unit

      指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。

    • workQueue

      线程池中的任务队列.

      常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

    • threadFactory

      线程工厂,提供创建新线程的功能。ThreadFactory是一个接口,只有一个方法

      public interface ThreadFactory {
        Thread newThread(Runnable r);
      }
      • 1
      • 2
      • 3

      通过线程工厂可以对线程的一些属性进行定制。

      默认的工厂:

      static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
      
        DefaultThreadFactory() {
            SecurityManager var1 = System.getSecurityManager();
            this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }
      
        public Thread newThread(Runnable var1) {
            Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            if(var2.isDaemon()) {
                var2.setDaemon(false);
            }
      
            if(var2.getPriority() != 5) {
                var2.setPriority(5);
            }
      
            return var2;
        }
      }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    • RejectedExecutionHandler

      RejectedExecutionHandler也是一个接口,只有一个方法

      public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
      }
      • 1
      • 2
      • 3

      当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。

    线程池规则

    线程池的线程执行规则跟任务队列有很大的关系。

    • 下面都假设任务队列没有大小限制:

      1. 如果线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中。
      2. 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
      3. 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
      4. 如果线程数量>核心线程数,并且>最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
      5. 如果线程数量>核心线程数,并且>最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。
    • 任务队列大小有限时

      1. 当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
      2. SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。

    规则验证

    前提

    所有的任务都是下面这样的,睡眠两秒后打印一行日志:

    Runnable myRunnable = new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + " run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    };
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    所有验证过程都是下面这样,先执行三个,再执行三个,8秒后,各看一次信息

    executor.execute(myRunnable);
    executor.execute(myRunnable);
    executor.execute(myRunnable);
    System.out.println("---先开三个---");
    System.out.println("核心线程数" + executor.getCorePoolSize());
    System.out.println("线程池数" + executor.getPoolSize());
    System.out.println("队列任务数" + executor.getQueue().size());
    executor.execute(myRunnable);
    executor.execute(myRunnable);
    executor.execute(myRunnable);
    System.out.println("---再开三个---");
    System.out.println("核心线程数" + executor.getCorePoolSize());
    System.out.println("线程池数" + executor.getPoolSize());
    System.out.println("队列任务数" + executor.getQueue().size());
    Thread.sleep(8000);
    System.out.println("----8秒之后----");
    System.out.println("核心线程数" + executor.getCorePoolSize());
    System.out.println("线程池数" + executor.getPoolSize());
    System.out.println("队列任务数" + executor.getQueue().size());
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    验证1

    1. 核心线程数为6,最大线程数为10。超时时间为5秒

      ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 10, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
      • 1
      ---先开三个---
      核心线程数6
      线程池线程数3
      队列任务数0
      ---再开三个---
      核心线程数6
      线程池线程数6
      队列任务数0
      pool-1-thread-1 run
      pool-1-thread-6 run
      pool-1-thread-5 run
      pool-1-thread-3 run
      pool-1-thread-4 run
      pool-1-thread-2 run
      ----8秒之后----
      核心线程数6
      线程池线程数6
      队列任务数0
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    可以看到每个任务都是是直接启动一个核心线程来执行任务,一共创建了6个线程,不会放入队列中。8秒后线程池还是6个线程,核心线程默认情况下不会被回收,不收超时时间限制。

    验证2

    1. 核心线程数为3,最大线程数为6。超时时间为5秒,队列是LinkedBlockingDeque

      ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
      • 1
      ---先开三个---
      核心线程数3
      线程池线程数3
      队列任务数0
      ---再开三个---
      核心线程数3
      线程池线程数3
      队列任务数3
      pool-1-thread-3 run
      pool-1-thread-1 run
      pool-1-thread-2 run
      pool-1-thread-3 run
      pool-1-thread-1 run
      pool-1-thread-2 run
      ----8秒之后----
      核心线程数3
      线程池线程数3
      队列任务数0
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

      当任务数超过核心线程数时,会将超出的任务放在队列中,只会创建3个线程重复利用。

    验证3

    1. 核心线程数为3,最大线程数为6。超时时间为5秒,队列是SynchronousQueue
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    • 1
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    ---再开三个---
    核心线程数3
    线程池线程数6
    队列任务数0
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-6 run
    pool-1-thread-4 run
    pool-1-thread-5 run
    pool-1-thread-1 run
    ----8秒之后----
    核心线程数3
    线程池线程数3
    队列任务数0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    当队列是SynchronousQueue时,超出核心线程的任务会创建新的线程来执行,看到一共有6个线程。但是这些线程是费核心线程,收超时时间限制,在任务完成后限制超过5秒就会被回收。所以最后看到线程池还是只有三个线程。

    验证4

    1. 核心线程数是3,最大线程数是4,队列是LinkedBlockingDeque

      ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
      • 1
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    ---再开三个---
    核心线程数3
    线程池线程数3
    队列任务数3
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    ----8秒之后----
    核心线程数3
    线程池线程数3
    队列任务数0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    LinkedBlockingDeque根本不受最大线程数影响。

    但是当LinkedBlockingDeque有大小限制时就会受最大线程数影响了

    4.1 比如下面,将队列大小设置为2.

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(2));
    • 1
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    ---再开三个---
    核心线程数3
    线程池线程数4
    队列任务数2
    pool-1-thread-2 run
    pool-1-thread-1 run
    pool-1-thread-4 run
    pool-1-thread-3 run
    pool-1-thread-1 run
    pool-1-thread-2 run
    ----8秒之后----
    核心线程数3
    线程池线程数3
    队列任务数0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    首先为三个任务开启了三个核心线程1,2,3,然后第四个任务和第五个任务加入到队列中,第六个任务因为队列满了,就直接创建一个新线程4,这是一共有四个线程,没有超过最大线程数。8秒后,非核心线程收超时时间影响回收了,因此线程池只剩3个线程了。

    4.2 将队列大小设置为1

    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1));
    • 1
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@677327b6 rejected from java.util.concurrent.ThreadPoolExecutor@14ae5a5[Running, pool size = 4, active threads = 4, queued tasks = 1, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at com.sunlinlin.threaddemo.Main.main(Main.java:35)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    pool-1-thread-1 run
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-4 run
    pool-1-thread-1 run
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    直接出错在第6个execute方法上。因为核心线程是3个,当加入第四个任务的时候,就把第四个放在队列中。加入第五个任务时,因为队列满了,就创建新线程执行,创建了线程4。当加入第六个线程时,也会尝试创建线程,但是因为已经达到了线程池最大线程数,所以直接抛异常了。

    验证5

    1. 核心线程数是3 ,最大线程数是4,队列是SynchronousQueue

      ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
      • 1
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.sunlinlin.threaddemo.Main$1@14ae5a5 rejected from java.util.concurrent.ThreadPoolExecutor@7f31245a[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at com.sunlinlin.threaddemo.Main.main(Main.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    ---先开三个---
    核心线程数3
    线程池线程数3
    队列任务数0
    pool-1-thread-2 run
    pool-1-thread-3 run
    pool-1-thread-4 run
    pool-1-thread-1 run
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    这次在添加第五个任务时就报错了,因为SynchronousQueue各奔不保存任务,收到一个任务就去创建新线程。所以第五个就会抛异常了。

  • 相关阅读:
    第六节:流上下文
    第五节:控制序列化和反序列化的数据
    第四节:格式化器如何序列化类型实例
    第三节:控制序列化和反序列化
    第二节:使类型可序列化
    第一节:序列化和反序列化快速入门
    第五节:使用反射发现类型成员
    golang 一些坑 rang
    golang json格式字符串反序列化映射到切片结构体
    golang 结构体内嵌结构体序列化填充
  • 原文地址:https://www.cnblogs.com/zuoqun/p/9405625.html
Copyright © 2011-2022 走看看