zoukankan      html  css  js  c++  java
  • 线程池

    八、线程池

    ThreadPoolExecutor

    在之前的demo中,都是使用new Thread()手动创建线程池。但是在工作中使用的话,阿里巴巴编码规约明确说明,线程必须交给线程池来管理。避免资源耗尽的风险。

    传统的手动new的方式创建的线程,如果线程非常多的话,就会非常杂乱,无法管理。线程之间互相竞争资源,容易产生线程乱入的风险。线程如果非常多,会造成线程切换频繁,浪费cpu资源。线程多也会增加系统资源耗尽的风险。

    所以我们必须使用线程池管理。

    关于ThreadPoolExecutor

    先来查看ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    

    构造方法包含的参数有:

    • corePoolSize:线程池的核心线程数
    • maximumPoolSize: 线程池最大线程数
    • keepAliveTime:空闲线程的存活时间
    • unit:空闲线程存活时间单位
    • workQueue:线程池等待队列。
    • threadFactory:线程工厂,生产线程的类。
    • handler:线程拒绝策略。当线程池满的时候,该怎么拒绝后来的线程。

    要想理解上面的参数。我们首先要理解线程池的工作原理,线程池的存在就是为了管理线程。池化思想的实现无非就是:节省资源,提速,资源复用,方便管理。线程池的实现也是为了方便管理线程,复用现有线程,节省线程资源开销,避免线程过多时cpu浪费大量时间在线程的切换上。

    当我们使用线程池执行任务时,会经历如下流程:

    image-20210120163025578

    上图就是线程池的大概的工作流程,看了上面的图,大概就对线程池的参数的意义有了大概的了解了。

    线程池的创建

    线程池的创建可以通过new ThreadPoolExecutor()来实现。

    package com.xiazhi.pool;
    
    
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class ExecutorCreateDemo {
    
        public static void main(String[] args) {
    
            // 如果不指定threadFactory,会使用默认的线程工厂 Executors.defaultThreadFactory()
            // 如果不指定拒绝策略,会使用默认的拒绝策略 new AbortPolicy()
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4,
                    8,
                    60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(100));
        }
    }
    

    java通过Executors类默认实现了几个线程池:

    • Executors.newSingleThreadExecutor(): 这是一个单线程的线程池,实现方式为

      		public static ExecutorService newSingleThreadExecutor() {
              return new FinalizableDelegatedExecutorService
                  (new ThreadPoolExecutor(1, 1,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>()));
          }
      

      可以看到也是通过new ThreadPoolExecutor的方式实现,不过这个线程池的核心线程数和最大线程数都是1,因此里面永远只有一个线程。而且他的等待队列为 LinkedBlockingQueue, 这个队列是无界队列,当任务的生产速度大于消费速度时,队列就会不停的堆积,容易造成内存溢出。

      只有一个队列,为什么要使用线程池?

      可以使用线程池来管理这个线程的生命周期。

      使用方式:

        	@Test
          public void singleThreadExecutor() {
              ExecutorService executorService = Executors.newSingleThreadExecutor();
              executorService.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newFixedThreadPool(int size): 创建一个固定数量的线程池。实现为:

      		public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }
      

      可以看到,在创建线程池时,核心线程数和最大线程数一样,而且使用的也是LinkedBlockingQueue,使用的弊端与上面一样,容易造成内存溢出。

      使用方式:

          @Test
          public void fixedThreadPool() {
              ExecutorService service = Executors.newFixedThreadPool(5);
              service.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newCachedThreadPool(): 创建一个缓存线程池。此线程池的实现方式为:

      		public static ExecutorService newCachedThreadPool() {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>());
          }
      

      可以看到这个线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,可以认为最大线程数是无界的。存活时间是60秒,如果线程超过60秒没有新的任务,那么就会被销毁。使用的是SynchronousQueue。分析队列为SynchronousQueue,也就以为着每次又一个任务到达时,就必须有一个线程处理这个任务,否则就会阻塞等待。那么当一个任务到达时,如果没有空闲线程,就会不断的创建新的线程,而最大线程数为无界,当任务生产速度大于消费速度时就会增加资源耗尽的风险。而且线程数比较大时也会增加上下文切换的开销,最终造成cpu的时间全都浪费在线程切换上,最终反而降低性能。

      使用方式:

          @Test
          public void cachedThreadPool() {
              ExecutorService executorService = Executors.newCachedThreadPool();
              executorService.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newScheduledThreadPool(): 创建一个定时线程池。实现为:

          public ScheduledThreadPoolExecutor(int corePoolSize) {
              super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                    new DelayedWorkQueue());
          }
      

      最大线程数仍为Integer.MAX_VALUE,因此会有资源耗尽的风险,而且线程存活时间为0,线程复用率低。

      使用方式:

          @Test
          public void scheduledThreadPool() throws InterruptedException {
              ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
              scheduledExecutorService.schedule(() -> System.out.println("hello world"), 10, TimeUnit.SECONDS);
              TimeUnit.SECONDS.sleep(20);
          }
      

    上面集中线程池的创建方式虽然方便,但是都有缺陷,而且都会造成资源耗尽的风险。

    阿里巴巴代码规约规定:禁止通过Executors创建线程池,必须通过new ThreadPoolExecutor的方式创建线程池。除了因为Executors创建的线程池存在上面的缺陷外,Executors隐藏了线程池的创建细节参数,可读性差,而且会影响初学者。

    线程工厂

    线程池在创建时需要手动指定线程工厂,线程工厂是为了创建线程时为线程指定名字,出问题时方便排查错误。Executors提供了一个默认的线程工厂的实现:

        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    

    但是这个默认工厂的实现,但是在实际使用中为了能够在出问题时能够方便快速的定位,需要定义一个有意义的线程名。因此需要自定义线程工厂实现。

    ThreadGroup

    在DefaultThreadFactory中我们看到一个ThreadGroup的类,ThreadGroup--线程组。

    我们可以把线程归属到某个线程组中,线程组中可以包含多个线程组,线程和线程组间组成树状关系。使用线程组可以方便我们管理线程。

    查看ThreadGroup的构造方法:

    		public ThreadGroup(String name) {
            this(Thread.currentThread().getThreadGroup(), name);
        }
    
    		public ThreadGroup(ThreadGroup parent, String name) {
            this(checkParentAccess(parent), parent, name);
        }
    

    可以指定父线程组创建线程组,当不指定父线程组时,使用当前线程作为父线程组。

    创建线程关联线程组:

    package com.xiazhi.pool;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class ThreadGroupDemo {
    
        static void run() {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
            System.out.println("hello threadGroup");
        }
        public static void main(String[] args) {
            // 使用new ThreadGroup()
            ThreadGroup threadGroup = new ThreadGroup("test-group-1");
            ThreadGroup subGroup = new ThreadGroup(threadGroup, "test-group-1");
            // 创建线程时指定线程组
            new Thread(threadGroup, ThreadGroupDemo::run, "thread-1").start();
            new Thread(threadGroup, ThreadGroupDemo::run, "thread-2").start();
            new Thread(subGroup, ThreadGroupDemo::run, "sub-thread-1").start();
    
            // 获取活动线程数及线程组数
            System.out.println("threadGroup.activeCount() = " + threadGroup.activeCount());
            // 活动线程组数
            System.out.println("threadGroup.activeGroupCount() = " + threadGroup.activeGroupCount());
            // 打印线程组名称
            System.out.println("threadGroup.getName() = " + threadGroup.getName());
            // 输出线程组的所有子节点
            threadGroup.list();
            // 调用interrupt方法会将线程组的所有线程中断标志设置为true
            threadGroup.interrupt();
        }
    }
    

    自定义线程工厂

    自定义线程工厂需要实现ThreadFactory接口。

    package com.xiazhi.pool;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class SimpleThreadFactory implements ThreadFactory {
        /** 线程自增id */
        private final AtomicInteger number = new AtomicInteger(0);
        /** 归属线程组 */
        private final ThreadGroup group;
        /** 线程名前缀 */
        private final String prefix;
    
        public SimpleThreadFactory() {
            this.group = new ThreadGroup("test-group");
            this.prefix = "pool-test-thread";
        }
    
        @Override
        public Thread newThread(Runnable task) {
            Thread thread = new Thread(this.group, task,
                    this.prefix + this.number.incrementAndGet());
    
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            return thread;
        }
    }
    
    守护线程

    在上面的代码中我们注意到有这么一句代码。

    				if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
    

    这段代码的意思是:判断当前线程是否是守护线程,如果是守护线程,那么就将当前线程设置为正常线程。

    什么是守护线程?

    守护线程是一个特殊的线程,当进程中不存在非守护线程时,守护线程就会销毁。jvm中的垃圾回收器就是守护线程,当jvm中没有运行中的非守护线程时,jvm就会退出。

    守护线程的用法如下:

    package com.xiazhi.pool;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class DaemonThreadDemo {
    
        static void daemonRun() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("守护线程运行");
            }
        }
    
        public static void main(String[] args) {
    
            Thread daemon = new Thread(DaemonThreadDemo::daemonRun);
            daemon.setDaemon(true);
            Thread thread = new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "开始执行");
                try {
                    // 模拟线程处理业务耗时
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行结束");
            });
            thread.start();
            daemon.start();
        }
    }
    

    线程池的拒绝策略RejectedExecutionHandler

    根据上面的线程池执行的流程图,可以知道当线程池的等待队列已满,而且线程池已经达到最大线程数,那么后面再来的任务就回调用拒绝策略处理。java线程池默认提供了4种拒绝策略:

    • AbortPolicy: 抛出异常处理。当线程进入拒绝策略时,就会抛出RejectedExecutionException异常。
    • DiscardPolicy: 丢弃被拒绝的任务。
    • DiscardOldestPolicy: 丢弃最老的未被处理的任务。
    • CallerRunsPolicy: 调用线程处理。谁提交的这个任务,那么就叫给这个线程处理。

    上面四种线程拒绝策略,无论哪儿一种,再生产环境时都是不可取的,我们在工作中,一般都会自定义拒绝策略,将任务存入mq或存入其他地方,等待线程池空闲时执行。但是不能不执行。

    自定义拒绝策略:

    自定义拒绝策略只需要实现RejectedExecutionHandler接口。

    package com.xiazhi.pool;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class MessageQueuePolicy implements RejectedExecutionHandler {
    
        /** 模拟为mq容器,拒绝任务会被放入容器中 */
        public final List<CustomTask> list = new ArrayList<>();
    
    
        static class CustomTask implements Runnable, Serializable {
    
            private final String name;
    
            public CustomTask(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(String.format("[%s]处理任务:%s", Thread.currentThread().getName(), name));
            }
        }
    
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            // 线程池未关闭
            if (!executor.isShutdown()) {
                System.out.println("进入线程拒绝策略...");
                CustomTask task = (CustomTask) runnable;
                list.add(task);
            }
        }
    }
    
    创建一个自定义线程工厂及拒绝策略的线程池
    package com.xiazhi.pool;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class CustomThreadPoolDemo {
    
        private final ExecutorService executorService;
    
        public CustomThreadPoolDemo() {
            executorService = new ThreadPoolExecutor(3,
                    3,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    new SimpleThreadFactory(),
                    new MessageQueuePolicy());
        }
    
        public static void main(String[] args) {
            CustomThreadPoolDemo poolDemo = new CustomThreadPoolDemo();
    
            for (int i = 0; i < 20; i++) {
                poolDemo.executorService.execute(new MessageQueuePolicy.CustomTask("张三"));
            }
        }
    }
    

    线程池的使用

    线程池ThreadPoolExecutor实现了ExecutorService接口,而ExecutorService接口又继承了Executor接口,线程池的常用方法有:

    • void execute(Runnable command): 提交Runnable任务
    • Future<?> submit(Runnable task): 提交Runnable任务并返回代表该任务的Future,任务成功完成后,调用Futureget()方法将返回null
    • Future<?> submit(Runnable task, T result): 提交Runnable任务并返回代表该任务的Future, 任务完成后调用Futureget()方法将返回给定的result。
    • Future<T> submit(Callable<T> task): 提交带返回值的任务并返回代表该任务的Future,任务执行完成后调用Futureget()方法获取任务执行返回值。
    • void shutdown(): 启动关闭线程池。在线程池关闭过程中,会继续执行已经提交的任务(包含等待队列中的任务),但是不会接收新的任务,当所有任务都执行完毕,线程池关闭。如果线程池已经关闭,再次调用不会有影响。
    • List<Runnable> shutdownNow(): 马上关闭线程池。暂停所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务列表。会给所有正在执行的线程发送interrupt()中断消息。
    • boolean isShutdown(): 线程池是否关闭。返回是否在执行线程池的关闭程序,也可以说返回是否调用过shutdown()方法。
    • boolean isTerminated(): 线程池是否终止。isShutdown()方法返回的是是否线程池调用过shutdown()shutdownNow()方法,而无论调用哪儿个方法线程池都不会立即完毕,会处理完当前线程的任务或设置线程中断。无论哪儿种方式,都是无法立即结束线程的。而此方法返回的就是当前线程池中是否还有存活的线程。线程的生命周期,终止状态为Terminate,当线程池中所有线程的状态都为Terminate时,返回true。
    • boolean awaitTermination(long timeout, TimeUnit unit): 等待终止,如果超时,立即终止
    • List<Future<T>> invokeAll(Collection< ? extend Callable<T>> tasks): 批量执行任务,并返回代表人物的Future集合。
    • T invokeAny(Collection<? extend Callable<T>> tasks): 批量执行任务,当有一个任务有执行结果时,返回此结果并取消其他的任务。

    下面进入方法实践:

    package com.xiazhi.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class UseThreadPoolDemo {
    
        private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3,
                5,
                0L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20),
                new SimpleThreadFactory());
    
        static void runCommand() {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(String.format("[%s]开始执行command", Thread.currentThread().getName()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        static String runTask() {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(String.format("[%s]开始执行task", Thread.currentThread().getName()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 提交无返回值的Runnable任务
            poolExecutor.execute(UseThreadPoolDemo::runCommand);
            // 等待线程执行结束
            TimeUnit.SECONDS.sleep(3);
            System.out.println("============================================================");
    
            // submit提交Runnable返回null值得任务
            Future<?> result = poolExecutor.submit(UseThreadPoolDemo::runCommand);
            System.out.println("submit(Runnable) result= " + result.get());
            System.out.println("============================================================");
    
            // 当任务执行结束返回指定返回值
            Future<String> successful = poolExecutor.submit(UseThreadPoolDemo::runCommand, "successful");
            System.out.println("successful.get() = " + successful.get());
            System.out.println("============================================================");
    
            // 提交带返回值得Callable任务
            Future<String> submit = poolExecutor.submit(UseThreadPoolDemo::runTask);
            System.out.println("submit.get() = " + submit.get());
            System.out.println("============================================================");
    
            // 批量提交任务执行任务,返回批量结果
            List<Callable<String>> tasks = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                tasks.add(UseThreadPoolDemo::runTask);
            }
            List<Future<String>> futures = poolExecutor.invokeAll(tasks);
            for (Future<String> future : futures) {
                System.out.println("future.get() = " + future.get());
            }
            System.out.println("============================================================");
    
    
            // 批量提交任务,成功任意一个取消其他任务
            List<Callable<String>> anyTasks = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                tasks.add(UseThreadPoolDemo::runTask);
            }
            String resultStr = poolExecutor.invokeAny(tasks);
            System.out.println("resultStr = " + resultStr);
            System.out.println("============================================================");
    
            // 关闭线程池
            poolExecutor.shutdown();
            System.out.println("poolExecutor.isShutdown() = " + poolExecutor.isShutdown());
            System.out.println("============================================================");
    
            System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
            System.out.println("============================================================");
    
            TimeUnit.SECONDS.sleep(3);
            System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
        }
    }
    

    线程池的大小设置

    线程池的线程数大小是否越大越好?

    在回答这个问题前先看下面代码:

    package com.xiazhi.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 要使用线程池启用多线程对一个数进行自增,从0加到1_000_000
     *
     * @author 赵帅
     * @date 2021/1/21
     */
    public class PoolThreadNumDemo {
    
        /**
         * 方式1,启动100个线程每隔线程加10_000
         */
        public static void way1() throws InterruptedException, ExecutionException {
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 30L,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            AtomicInteger num = new AtomicInteger();
            ArrayList<Callable<Integer>> callables = new ArrayList<>();
            for (int i = 0; i < 100; i++) {
                callables.add(() -> {
                    for (int j = 0; j < 10_000; j++) {
                        num.incrementAndGet();
                    }
                    return num.get();
                });
            }
    
            long start = System.nanoTime();
            List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
            for (Future<Integer> future : futures) {
                future.get();
            }
            System.out.println(String.format("way1调用总耗时:%s,结果:%s", (System.nanoTime() - start), num.get())); // 耗时:82152778
        }
    
    
        /**
         * 方式2,启动10个线程每隔线程加1_000_000
         */
        public static void way2() throws InterruptedException, ExecutionException {
            int core = Runtime.getRuntime().availableProcessors();
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, core + 1, 30L,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            AtomicInteger integer = new AtomicInteger(0);
            ArrayList<Callable<Integer>> callables = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                callables.add(() -> {
                    for (int j = 0; j < 100_000; j++) {
                        integer.incrementAndGet();
                    }
                    return integer.get();
                });
            }
    
            long start = System.nanoTime();
            List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
            for (Future<Integer> future : futures) {
                future.get();
            }
            System.out.println(String.format("way2调用总耗时:%s,结果:%s", (System.nanoTime() - start), integer.get()));// 耗时:32637464
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            way1();
            way2();
        }
    }
    

    通过上面代码可以知道并不是线程越多越好,因为线程越多带来的后果就是线程切换频繁,cpu时间都耗费在线程切换上,cpu的利用率也就变相的降低了。

    那么线程数如何设置:

    线程数的设置与电脑配置,也就是CPU核数有关,还要关联业务。一般可以根据公式估算线程数:

    线程数=N*cpu利用率*(1+等待时间/计算时间)

    当然这个公式只能作为估算值,具体的值需要根据业务以及压测结果进行调整。

    线程池的拓展

    线程池提供了钩子函数,可以在线程执行前,执行后,以及线程池销毁时对线程池进行自定义扩展。

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class ThreadPoolExecutorProvider extends ThreadPoolExecutor {
        public ThreadPoolExecutorProvider(int corePoolSize,
                                          int maximumPoolSize,
                                          long keepAliveTime,
                                          TimeUnit unit,
                                          BlockingQueue<Runnable> workQueue,
                                          ThreadFactory threadFactory,
                                          RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("线程执行前执行");
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("线程执行后执行");
        }
    
        @Override
        protected void terminated() {
            System.out.println("线程池被终止");
        }
    
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutorProvider(1,
                    1,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    new AbortPolicy());
    
            executor.execute(() -> System.out.println("执行任务。。。"));
            executor.shutdown();
        }
    }
    

    线程池源码阅读

    线程池代码中有一个属性:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));这个属性是AtomicInteger类型,因此是原子性的。而且初始值为:initialValue = ctlOf(RUNNING,0)。继续查看源码:RUNNINT = -1<<29,那么最终RUNNING的值为11100000000000000000000000000000,可以注意到前三位是1,后面都是0。

    继续看ctlOf(RUNNING,0)方法内部:private static int ctlOf(int rs, int wc) { return rs | wc; },那么最终ctl属性的初始值就是:11100000000000000000000000000000,其中后29位表示的是当前线程池中的线程数,而前3位表示线程池的状态。这些可以从代码中看出:

    		private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
    		// 获取当前线程池状态
    		private static int runStateOf(int c)     { return c & ~CAPACITY; }
    		// 获取当前线程池中线程数量
    		private static int workerCountOf(int c)  { return c & CAPACITY; }
    

    到目前为止,我们可以得出线程池可以存放的最大线程数为00011111111111111111111111111111既2^29-1=536870911个线程

    查看线程池的execute(Runnable command)方法的源码:

    public void execute(Runnable command) {
      
      			// 要执行的任务不能是null
            if (command == null)
                throw new NullPointerException();
      			// 获取ctl的值
            int c = ctl.get();
      			// 获取当前线程数
            if (workerCountOf(c) < corePoolSize) {
              // 当前线程数小于核心线程数,直接添加新的线程,结束
                if (addWorker(command, true))
                    return;
              // 再次获取ctl的值,避免在此期间线程的状态被改变,保证数据的准确性
                c = ctl.get();
            }
      			// 当线程走到这里,说明当前线程池核心线程数已经满了
      			// 线程池状态为运行中并且向等待队列插入任务成功
            if (isRunning(c) && workQueue.offer(command)) {
              	// 再次获取ctl值,保证获取最新值
                int recheck = ctl.get();
                // 当前线程池是关闭状态,那么从队列中移除这个任务并拒绝
                if (! isRunning(recheck) && remove(command))
                    reject(command);
              // 如果线程池不是关闭状态或者移除队列任务失败,那么获取当前线程数,如果是0就添加一个空的worker
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
      			// 插入等待队列失败,新建线程处理
            else if (!addWorker(command, false))
              	// 新建任务失败,说明达到最大线程数,调用拒绝策略
                reject(command);
        }
    

    可以看出上面的流程分析与最开始画的线程池工作流程图是一样的,不过在细节上添加了很多判断当前线程池状态的操作。可以看出在代码中多次重新获取ctl值,用来获取当前线程池状态以及当前线程数。那么为什么要用一个值即表示线程池状态又表示线程数呢?

    如果将这两个数拆出来拆成两个值,那么就需要不断的刷新两个数据的值,也就是说上面不断的获取ctl值需要获取两个值了,那么假设这种情况,我获取线程池状态,状态为运行中,然后我获取当前线程数,假设此时线程池关闭了,那么此时是不知道的,就会造成获取到的状态不是实时状态,而使用一个数操作,不仅操作方便,而且也避免了数据实时性的问题。(仅代表个人理解,如果有高见请指出)

    然后我们在看上面代码,是通过addWorker(Runnable command,boolean core)方法来向线程池添加线程的。

    • 参数command:要执行的任务
    • 参数core: 是否是核心线程,因为核心线程是不会销毁的,而不是核心线程,当空闲时间超过存活时间会自动销毁。

    查看addWorker方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
      			// 自旋,保证修改当前线程数一定能成功
            for (;;) {
              // 获取线程池状态,如果线程池已经被关闭,那么就不添加到线程池了
                int c = ctl.get();
                int rs = runStateOf(c);
    
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                for (;;) {
                  // 如果超过线程数限制返回失败,是核心线程就比较核心线程数,不是核心线程就比较最大线程数
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                  // 线程数+1
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
      			// 走到这里才真正想线程池中添加线程
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
              // 创建一个新的线程,线程池中使用Worker创建线程,并将当前任务作为线程的第一个任务
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                          // 将线程添加到线程池
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                      // 启动线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    上面创建线程池中的线程使用的是Worker来创建线程。那么创建的Worker都存放在线程池,线程池的实现就是:private final HashSet<Worker> workers = new HashSet<Worker>();这个hashSet就是线程池。查看Worker的代码:

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
              // 设置状态为-1表示此worker为新建状态,还未开启线程,不支持中断
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    可以看出Worker也继承自AQS,我们学习AQS时说AQS的核心就是state。在线程池中这个state就代表当前线程是否被使用。而且Worker也实现了Runnable接口,因此它自身就是一个线程任务,他内部的属性Thread在创建时调用了线程工程创建一个新线程。因此说一个Worker就是代表了一个可执行线程。而且这个可执行线程的run方法是:

            public void run() {
                runWorker(this);
            }
    

    查看runWorker方法:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
      // 获取当前线程的第一个任务并将firstTask属性设置为null(也就是取出了worker的第一个任务)
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
              // 如果worker的第一个任务不等于空 或者 从等待队列取到的任务不是空
                while (task != null || (task = getTask()) != null) {
                  // 执行线程的run方法。
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                      //调用钩子函数
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                          //运行run方法
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                          // 调钩子函数 
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
              // 控制线程退出
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    runWorker方法中我们学习到了线程池是如何复用线程的。当一个worker的线程被启动时,会调用runWorker方法,然后内部流程为:

    1. 取出worker的firstTask。
    2. firstTask是否为空,如果为空则跳至4。
    3. 执行firstTask的run方法。
    4. 从等待队列取出task,如果task不为空则跳至2,如果为空则往下。
    5. 控制线程退出。

    流程图如下:

    image-20210122094621712

    如果程序运行过程中出现异常,或者等待队列为空时就要退出异常线程或空闲线程。退出空闲线程的流程为:

    1. 当前线程是否需要退出(异常结束线程直接退出)
    2. 从线程池移除当前线程
    3. 如果当前线程池的线程数小于核心线程数,那么添加一个任务为空的worker

    ExecutorCompletionService

    在我们执行有返回值的任务时,会返回一个Future类型,可以通过future.get()方法获取返回值,但是这个get()方法时阻塞的。因此就可能会出现其他任务已经完成结果,但是我们还在阻塞等待前一个任务的结果。这样算是间接的浪费了时间。查看如下代码:

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
     * 冰箱: 发货 -> 收货 10s
     * 洗衣机: 发货-> 收货 15s
     * 电视: 发货 -> 收货 8s
     * 我们将电器从楼下搬到楼上的时间为3s
     *
     * @author 赵帅
     * @date 2021/1/22
     */
    public class NormalExecutorService {
    
        /**
         * 为了方便使用Executors,实际工作不建议使用
         */
        final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        static void transport(String shop) {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("搬运" + shop);
        }
    
        /**
         * 运输冰箱
         */
        static String transportFridge() {
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("冰箱到了");
            return "冰箱";
        }
    
        /**
         * 运输电视
         */
        static String transportTV() {
    
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("电视到了");
            return "TV";
        }
    
        /**
         * 运输洗衣机
         */
        static String transportWashing() {
    
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("洗衣机到了");
            return "洗衣机";
        }
    
    
        /**
         * 根据上面的题意,如果我们用正常的Future.get()方法去实现的话:
         */
        void way1() throws ExecutionException, InterruptedException {
            
            long start = System.currentTimeMillis();
            Future<String> fridge = executorService.submit(NormalExecutorService::transportFridge);
            Future<String> washing = executorService.submit(NormalExecutorService::transportWashing);
            Future<String> tv = executorService.submit(NormalExecutorService::transportTV);
    
            transport(fridge.get());
            transport(washing.get());
            transport(tv.get());
            System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
            executorService.shutdown();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NormalExecutorService service = new NormalExecutorService();
            service.way1();
        }
    }
    

    根据执行结果可以看出,电视先到了,但是并没有先把电视机搬上楼,而是等待冰箱到了之后才搬运冰箱,然后等待洗衣机到搬运洗衣机,最后才搬运电视。总共耗时21秒。

    或许我们会想调换get的顺序,先搬运电视不就好了。现在是我们知道每个任务的时间,我们或许可以调整,但是在实际工作中,大多数并行情况我们并不能确定执行的时间(网络延迟等影响),因此我们也就无法确定我们应该先等待哪儿个到达。

    正确的处理思路是先到哪儿个先搬哪儿个。ExecutorCompletionService就是这样的一个作用。我们使用ExecutorCompletionService实现上面的过程:

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
     * 冰箱: 发货 -> 收货 10s
     * 洗衣机: 发货-> 收货 15s
     * 电视: 发货 -> 收货 8s
     * 我们将电器从楼下搬到楼上的时间为3s
     *
     * @author 赵帅
     * @date 2021/1/22
     */
    public class ExecutorCompolationServiceDemo {
        /**
         * 为了方便使用Executors,实际工作不建议使用
         */
        final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        static void transport(String shop) {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("搬运" + shop);
        }
    
        /**
         * 运输冰箱
         */
        static String transportFridge() {
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("冰箱到了");
            return "冰箱";
        }
    
        /**
         * 运输电视
         */
        static String transportTV() {
    
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("电视到了");
            return "TV";
        }
    
        /**
         * 运输洗衣机
         */
        static String transportWashing() {
    
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("洗衣机到了");
            return "洗衣机";
        }
    
    
        /**
         * 使用executorCompletionService实现
         */
        void way2() throws ExecutionException, InterruptedException {
            ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    
            long start = System.currentTimeMillis();
            completionService.submit(NormalExecutorService::transportFridge);
            completionService.submit(NormalExecutorService::transportWashing);
            completionService.submit(NormalExecutorService::transportTV);
    
            for (int i = 0; i < 3; i++) {
                transport(completionService.take().get());
            }
    
            System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
            executorService.shutdown();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorCompolationServiceDemo service = new ExecutorCompolationServiceDemo();
            service.way2();
        }
    }
    

    可以看到执行时间在18秒,提高了3秒。

    ForkJoinPool

    forkjoin是指任务的拆分和合并。forkJoinPool会将一个任务拆分成多个任务队列,也就是fork,每一个任务队列是一个线程,当一个任务队列中的任务执行完之后,会从其他任务队列中偷取任务执行,然后执行结果再进行合并也就是join。java8新特性中的并行流就是通过forkJoinPool实现的。

    package com.xiazhi.pool;
    
    import java.util.Arrays;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class ParallelStreamDemo {
        static String[] array = new String[1_000_000];
    
        public static void main(String[] args) {
            for (int i = 0; i < array.length; i++) {
                array[i] = "hello" + i;
            }
    
            Arrays.stream(array).parallel().forEach(f->{
                System.out.println(Thread.currentThread().getName() + ":value:" + f);
            });
        }
    }
    

    观察结果可以看到启动了很多的线程

  • 相关阅读:
    CODING x 百果园 _ 水果零售龙头迈出 DevOps 体系建设第一步
    Nocalhost 亮相 CD Foundation 国内首届 Meetup,Keith Chan 将出席致辞
    做云原生时代标准化工具,实现高效云上研发工作流
    打造数字化软件工厂 —— 一站式 DevOps 平台全景解读
    WePack —— 助力企业渐进式 DevOps 转型
    CODING Compass —— 打造行云流水般的软件工厂
    Nocalhost —— 让云原生开发回归原始而又简单
    CODING 代码资产安全系列之 —— 构建全链路安全能力,守护代码资产安全
    Nocalhost:云原生开发新体验
    使用 Nocalhost 开发 Kubernetes 中的 APISIX Ingress Controller
  • 原文地址:https://www.cnblogs.com/Zs-book1/p/14312491.html
Copyright © 2011-2022 走看看