zoukankan      html  css  js  c++  java
  • 线程池

    八、线程池

    ThreadPoolExecutor

    在之前的demo中,都是使用new Thread()手动创建线程池。但是在工作中使用的话,阿里巴巴编码规约明确说明,线程必须交给线程池来管理。避免资源耗尽的风险。

    传统的手动new的方式创建的线程,如果线程非常多的话,就会非常杂乱,无法管理。线程之间互相竞争资源,容易产生线程乱入的风险。线程如果非常多,会造成线程切换频繁,浪费cpu资源。线程多也会增加系统资源耗尽的风险。

    所以我们必须使用线程池管理。

    关于ThreadPoolExecutor

    先来查看ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    

    构造方法包含的参数有:

    • corePoolSize:线程池的核心线程数
    • maximumPoolSize: 线程池最大线程数
    • keepAliveTime:空闲线程的存活时间
    • unit:空闲线程存活时间单位
    • workQueue:线程池等待队列。
    • threadFactory:线程工厂,生产线程的类。
    • handler:线程拒绝策略。当线程池满的时候,该怎么拒绝后来的线程。

    要想理解上面的参数。我们首先要理解线程池的工作原理,线程池的存在就是为了管理线程。池化思想的实现无非就是:节省资源,提速,资源复用,方便管理。线程池的实现也是为了方便管理线程,复用现有线程,节省线程资源开销,避免线程过多时cpu浪费大量时间在线程的切换上。

    当我们使用线程池执行任务时,会经历如下流程:

    image-20210120163025578

    上图就是线程池的大概的工作流程,看了上面的图,大概就对线程池的参数的意义有了大概的了解了。

    线程池的创建

    线程池的创建可以通过new ThreadPoolExecutor()来实现。

    package com.xiazhi.pool;
    
    
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class ExecutorCreateDemo {
    
        public static void main(String[] args) {
    
            // 如果不指定threadFactory,会使用默认的线程工厂 Executors.defaultThreadFactory()
            // 如果不指定拒绝策略,会使用默认的拒绝策略 new AbortPolicy()
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4,
                    8,
                    60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(100));
        }
    }
    

    java通过Executors类默认实现了几个线程池:

    • Executors.newSingleThreadExecutor(): 这是一个单线程的线程池,实现方式为

      		public static ExecutorService newSingleThreadExecutor() {
              return new FinalizableDelegatedExecutorService
                  (new ThreadPoolExecutor(1, 1,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>()));
          }
      

      可以看到也是通过new ThreadPoolExecutor的方式实现,不过这个线程池的核心线程数和最大线程数都是1,因此里面永远只有一个线程。而且他的等待队列为 LinkedBlockingQueue, 这个队列是无界队列,当任务的生产速度大于消费速度时,队列就会不停的堆积,容易造成内存溢出。

      只有一个队列,为什么要使用线程池?

      可以使用线程池来管理这个线程的生命周期。

      使用方式:

        	@Test
          public void singleThreadExecutor() {
              ExecutorService executorService = Executors.newSingleThreadExecutor();
              executorService.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newFixedThreadPool(int size): 创建一个固定数量的线程池。实现为:

      		public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }
      

      可以看到,在创建线程池时,核心线程数和最大线程数一样,而且使用的也是LinkedBlockingQueue,使用的弊端与上面一样,容易造成内存溢出。

      使用方式:

          @Test
          public void fixedThreadPool() {
              ExecutorService service = Executors.newFixedThreadPool(5);
              service.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newCachedThreadPool(): 创建一个缓存线程池。此线程池的实现方式为:

      		public static ExecutorService newCachedThreadPool() {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>());
          }
      

      可以看到这个线程池的核心线程数为0,最大线程数为Integer.MAX_VALUE,可以认为最大线程数是无界的。存活时间是60秒,如果线程超过60秒没有新的任务,那么就会被销毁。使用的是SynchronousQueue。分析队列为SynchronousQueue,也就以为着每次又一个任务到达时,就必须有一个线程处理这个任务,否则就会阻塞等待。那么当一个任务到达时,如果没有空闲线程,就会不断的创建新的线程,而最大线程数为无界,当任务生产速度大于消费速度时就会增加资源耗尽的风险。而且线程数比较大时也会增加上下文切换的开销,最终造成cpu的时间全都浪费在线程切换上,最终反而降低性能。

      使用方式:

          @Test
          public void cachedThreadPool() {
              ExecutorService executorService = Executors.newCachedThreadPool();
              executorService.execute(() -> System.out.println("hello world"));
          }
      
    • Executors.newScheduledThreadPool(): 创建一个定时线程池。实现为:

          public ScheduledThreadPoolExecutor(int corePoolSize) {
              super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                    new DelayedWorkQueue());
          }
      

      最大线程数仍为Integer.MAX_VALUE,因此会有资源耗尽的风险,而且线程存活时间为0,线程复用率低。

      使用方式:

          @Test
          public void scheduledThreadPool() throws InterruptedException {
              ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
              scheduledExecutorService.schedule(() -> System.out.println("hello world"), 10, TimeUnit.SECONDS);
              TimeUnit.SECONDS.sleep(20);
          }
      

    上面集中线程池的创建方式虽然方便,但是都有缺陷,而且都会造成资源耗尽的风险。

    阿里巴巴代码规约规定:禁止通过Executors创建线程池,必须通过new ThreadPoolExecutor的方式创建线程池。除了因为Executors创建的线程池存在上面的缺陷外,Executors隐藏了线程池的创建细节参数,可读性差,而且会影响初学者。

    线程工厂

    线程池在创建时需要手动指定线程工厂,线程工厂是为了创建线程时为线程指定名字,出问题时方便排查错误。Executors提供了一个默认的线程工厂的实现:

        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    

    但是这个默认工厂的实现,但是在实际使用中为了能够在出问题时能够方便快速的定位,需要定义一个有意义的线程名。因此需要自定义线程工厂实现。

    ThreadGroup

    在DefaultThreadFactory中我们看到一个ThreadGroup的类,ThreadGroup--线程组。

    我们可以把线程归属到某个线程组中,线程组中可以包含多个线程组,线程和线程组间组成树状关系。使用线程组可以方便我们管理线程。

    查看ThreadGroup的构造方法:

    		public ThreadGroup(String name) {
            this(Thread.currentThread().getThreadGroup(), name);
        }
    
    		public ThreadGroup(ThreadGroup parent, String name) {
            this(checkParentAccess(parent), parent, name);
        }
    

    可以指定父线程组创建线程组,当不指定父线程组时,使用当前线程作为父线程组。

    创建线程关联线程组:

    package com.xiazhi.pool;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class ThreadGroupDemo {
    
        static void run() {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
            System.out.println("hello threadGroup");
        }
        public static void main(String[] args) {
            // 使用new ThreadGroup()
            ThreadGroup threadGroup = new ThreadGroup("test-group-1");
            ThreadGroup subGroup = new ThreadGroup(threadGroup, "test-group-1");
            // 创建线程时指定线程组
            new Thread(threadGroup, ThreadGroupDemo::run, "thread-1").start();
            new Thread(threadGroup, ThreadGroupDemo::run, "thread-2").start();
            new Thread(subGroup, ThreadGroupDemo::run, "sub-thread-1").start();
    
            // 获取活动线程数及线程组数
            System.out.println("threadGroup.activeCount() = " + threadGroup.activeCount());
            // 活动线程组数
            System.out.println("threadGroup.activeGroupCount() = " + threadGroup.activeGroupCount());
            // 打印线程组名称
            System.out.println("threadGroup.getName() = " + threadGroup.getName());
            // 输出线程组的所有子节点
            threadGroup.list();
            // 调用interrupt方法会将线程组的所有线程中断标志设置为true
            threadGroup.interrupt();
        }
    }
    

    自定义线程工厂

    自定义线程工厂需要实现ThreadFactory接口。

    package com.xiazhi.pool;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class SimpleThreadFactory implements ThreadFactory {
        /** 线程自增id */
        private final AtomicInteger number = new AtomicInteger(0);
        /** 归属线程组 */
        private final ThreadGroup group;
        /** 线程名前缀 */
        private final String prefix;
    
        public SimpleThreadFactory() {
            this.group = new ThreadGroup("test-group");
            this.prefix = "pool-test-thread";
        }
    
        @Override
        public Thread newThread(Runnable task) {
            Thread thread = new Thread(this.group, task,
                    this.prefix + this.number.incrementAndGet());
    
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            return thread;
        }
    }
    
    守护线程

    在上面的代码中我们注意到有这么一句代码。

    				if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
    

    这段代码的意思是:判断当前线程是否是守护线程,如果是守护线程,那么就将当前线程设置为正常线程。

    什么是守护线程?

    守护线程是一个特殊的线程,当进程中不存在非守护线程时,守护线程就会销毁。jvm中的垃圾回收器就是守护线程,当jvm中没有运行中的非守护线程时,jvm就会退出。

    守护线程的用法如下:

    package com.xiazhi.pool;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/20
     */
    public class DaemonThreadDemo {
    
        static void daemonRun() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("守护线程运行");
            }
        }
    
        public static void main(String[] args) {
    
            Thread daemon = new Thread(DaemonThreadDemo::daemonRun);
            daemon.setDaemon(true);
            Thread thread = new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "开始执行");
                try {
                    // 模拟线程处理业务耗时
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行结束");
            });
            thread.start();
            daemon.start();
        }
    }
    

    线程池的拒绝策略RejectedExecutionHandler

    根据上面的线程池执行的流程图,可以知道当线程池的等待队列已满,而且线程池已经达到最大线程数,那么后面再来的任务就回调用拒绝策略处理。java线程池默认提供了4种拒绝策略:

    • AbortPolicy: 抛出异常处理。当线程进入拒绝策略时,就会抛出RejectedExecutionException异常。
    • DiscardPolicy: 丢弃被拒绝的任务。
    • DiscardOldestPolicy: 丢弃最老的未被处理的任务。
    • CallerRunsPolicy: 调用线程处理。谁提交的这个任务,那么就叫给这个线程处理。

    上面四种线程拒绝策略,无论哪儿一种,再生产环境时都是不可取的,我们在工作中,一般都会自定义拒绝策略,将任务存入mq或存入其他地方,等待线程池空闲时执行。但是不能不执行。

    自定义拒绝策略:

    自定义拒绝策略只需要实现RejectedExecutionHandler接口。

    package com.xiazhi.pool;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class MessageQueuePolicy implements RejectedExecutionHandler {
    
        /** 模拟为mq容器,拒绝任务会被放入容器中 */
        public final List<CustomTask> list = new ArrayList<>();
    
    
        static class CustomTask implements Runnable, Serializable {
    
            private final String name;
    
            public CustomTask(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(String.format("[%s]处理任务:%s", Thread.currentThread().getName(), name));
            }
        }
    
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            // 线程池未关闭
            if (!executor.isShutdown()) {
                System.out.println("进入线程拒绝策略...");
                CustomTask task = (CustomTask) runnable;
                list.add(task);
            }
        }
    }
    
    创建一个自定义线程工厂及拒绝策略的线程池
    package com.xiazhi.pool;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class CustomThreadPoolDemo {
    
        private final ExecutorService executorService;
    
        public CustomThreadPoolDemo() {
            executorService = new ThreadPoolExecutor(3,
                    3,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    new SimpleThreadFactory(),
                    new MessageQueuePolicy());
        }
    
        public static void main(String[] args) {
            CustomThreadPoolDemo poolDemo = new CustomThreadPoolDemo();
    
            for (int i = 0; i < 20; i++) {
                poolDemo.executorService.execute(new MessageQueuePolicy.CustomTask("张三"));
            }
        }
    }
    

    线程池的使用

    线程池ThreadPoolExecutor实现了ExecutorService接口,而ExecutorService接口又继承了Executor接口,线程池的常用方法有:

    • void execute(Runnable command): 提交Runnable任务
    • Future<?> submit(Runnable task): 提交Runnable任务并返回代表该任务的Future,任务成功完成后,调用Futureget()方法将返回null
    • Future<?> submit(Runnable task, T result): 提交Runnable任务并返回代表该任务的Future, 任务完成后调用Futureget()方法将返回给定的result。
    • Future<T> submit(Callable<T> task): 提交带返回值的任务并返回代表该任务的Future,任务执行完成后调用Futureget()方法获取任务执行返回值。
    • void shutdown(): 启动关闭线程池。在线程池关闭过程中,会继续执行已经提交的任务(包含等待队列中的任务),但是不会接收新的任务,当所有任务都执行完毕,线程池关闭。如果线程池已经关闭,再次调用不会有影响。
    • List<Runnable> shutdownNow(): 马上关闭线程池。暂停所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务列表。会给所有正在执行的线程发送interrupt()中断消息。
    • boolean isShutdown(): 线程池是否关闭。返回是否在执行线程池的关闭程序,也可以说返回是否调用过shutdown()方法。
    • boolean isTerminated(): 线程池是否终止。isShutdown()方法返回的是是否线程池调用过shutdown()shutdownNow()方法,而无论调用哪儿个方法线程池都不会立即完毕,会处理完当前线程的任务或设置线程中断。无论哪儿种方式,都是无法立即结束线程的。而此方法返回的就是当前线程池中是否还有存活的线程。线程的生命周期,终止状态为Terminate,当线程池中所有线程的状态都为Terminate时,返回true。
    • boolean awaitTermination(long timeout, TimeUnit unit): 等待终止,如果超时,立即终止
    • List<Future<T>> invokeAll(Collection< ? extend Callable<T>> tasks): 批量执行任务,并返回代表人物的Future集合。
    • T invokeAny(Collection<? extend Callable<T>> tasks): 批量执行任务,当有一个任务有执行结果时,返回此结果并取消其他的任务。

    下面进入方法实践:

    package com.xiazhi.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class UseThreadPoolDemo {
    
        private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3,
                5,
                0L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20),
                new SimpleThreadFactory());
    
        static void runCommand() {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(String.format("[%s]开始执行command", Thread.currentThread().getName()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        static String runTask() {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(String.format("[%s]开始执行task", Thread.currentThread().getName()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 提交无返回值的Runnable任务
            poolExecutor.execute(UseThreadPoolDemo::runCommand);
            // 等待线程执行结束
            TimeUnit.SECONDS.sleep(3);
            System.out.println("============================================================");
    
            // submit提交Runnable返回null值得任务
            Future<?> result = poolExecutor.submit(UseThreadPoolDemo::runCommand);
            System.out.println("submit(Runnable) result= " + result.get());
            System.out.println("============================================================");
    
            // 当任务执行结束返回指定返回值
            Future<String> successful = poolExecutor.submit(UseThreadPoolDemo::runCommand, "successful");
            System.out.println("successful.get() = " + successful.get());
            System.out.println("============================================================");
    
            // 提交带返回值得Callable任务
            Future<String> submit = poolExecutor.submit(UseThreadPoolDemo::runTask);
            System.out.println("submit.get() = " + submit.get());
            System.out.println("============================================================");
    
            // 批量提交任务执行任务,返回批量结果
            List<Callable<String>> tasks = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                tasks.add(UseThreadPoolDemo::runTask);
            }
            List<Future<String>> futures = poolExecutor.invokeAll(tasks);
            for (Future<String> future : futures) {
                System.out.println("future.get() = " + future.get());
            }
            System.out.println("============================================================");
    
    
            // 批量提交任务,成功任意一个取消其他任务
            List<Callable<String>> anyTasks = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                tasks.add(UseThreadPoolDemo::runTask);
            }
            String resultStr = poolExecutor.invokeAny(tasks);
            System.out.println("resultStr = " + resultStr);
            System.out.println("============================================================");
    
            // 关闭线程池
            poolExecutor.shutdown();
            System.out.println("poolExecutor.isShutdown() = " + poolExecutor.isShutdown());
            System.out.println("============================================================");
    
            System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
            System.out.println("============================================================");
    
            TimeUnit.SECONDS.sleep(3);
            System.out.println("poolExecutor.isTerminated() = " + poolExecutor.isTerminated());
        }
    }
    

    线程池的大小设置

    线程池的线程数大小是否越大越好?

    在回答这个问题前先看下面代码:

    package com.xiazhi.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 要使用线程池启用多线程对一个数进行自增,从0加到1_000_000
     *
     * @author 赵帅
     * @date 2021/1/21
     */
    public class PoolThreadNumDemo {
    
        /**
         * 方式1,启动100个线程每隔线程加10_000
         */
        public static void way1() throws InterruptedException, ExecutionException {
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 30L,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            AtomicInteger num = new AtomicInteger();
            ArrayList<Callable<Integer>> callables = new ArrayList<>();
            for (int i = 0; i < 100; i++) {
                callables.add(() -> {
                    for (int j = 0; j < 10_000; j++) {
                        num.incrementAndGet();
                    }
                    return num.get();
                });
            }
    
            long start = System.nanoTime();
            List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
            for (Future<Integer> future : futures) {
                future.get();
            }
            System.out.println(String.format("way1调用总耗时:%s,结果:%s", (System.nanoTime() - start), num.get())); // 耗时:82152778
        }
    
    
        /**
         * 方式2,启动10个线程每隔线程加1_000_000
         */
        public static void way2() throws InterruptedException, ExecutionException {
            int core = Runtime.getRuntime().availableProcessors();
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, core + 1, 30L,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            AtomicInteger integer = new AtomicInteger(0);
            ArrayList<Callable<Integer>> callables = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                callables.add(() -> {
                    for (int j = 0; j < 100_000; j++) {
                        integer.incrementAndGet();
                    }
                    return integer.get();
                });
            }
    
            long start = System.nanoTime();
            List<Future<Integer>> futures = poolExecutor.invokeAll(callables);
            for (Future<Integer> future : futures) {
                future.get();
            }
            System.out.println(String.format("way2调用总耗时:%s,结果:%s", (System.nanoTime() - start), integer.get()));// 耗时:32637464
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            way1();
            way2();
        }
    }
    

    通过上面代码可以知道并不是线程越多越好,因为线程越多带来的后果就是线程切换频繁,cpu时间都耗费在线程切换上,cpu的利用率也就变相的降低了。

    那么线程数如何设置:

    线程数的设置与电脑配置,也就是CPU核数有关,还要关联业务。一般可以根据公式估算线程数:

    线程数=N*cpu利用率*(1+等待时间/计算时间)

    当然这个公式只能作为估算值,具体的值需要根据业务以及压测结果进行调整。

    线程池的拓展

    线程池提供了钩子函数,可以在线程执行前,执行后,以及线程池销毁时对线程池进行自定义扩展。

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * @author 赵帅
     * @date 2021/1/21
     */
    public class ThreadPoolExecutorProvider extends ThreadPoolExecutor {
        public ThreadPoolExecutorProvider(int corePoolSize,
                                          int maximumPoolSize,
                                          long keepAliveTime,
                                          TimeUnit unit,
                                          BlockingQueue<Runnable> workQueue,
                                          ThreadFactory threadFactory,
                                          RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("线程执行前执行");
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("线程执行后执行");
        }
    
        @Override
        protected void terminated() {
            System.out.println("线程池被终止");
        }
    
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutorProvider(1,
                    1,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    new AbortPolicy());
    
            executor.execute(() -> System.out.println("执行任务。。。"));
            executor.shutdown();
        }
    }
    

    线程池源码阅读

    线程池代码中有一个属性:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));这个属性是AtomicInteger类型,因此是原子性的。而且初始值为:initialValue = ctlOf(RUNNING,0)。继续查看源码:RUNNINT = -1<<29,那么最终RUNNING的值为11100000000000000000000000000000,可以注意到前三位是1,后面都是0。

    继续看ctlOf(RUNNING,0)方法内部:private static int ctlOf(int rs, int wc) { return rs | wc; },那么最终ctl属性的初始值就是:11100000000000000000000000000000,其中后29位表示的是当前线程池中的线程数,而前3位表示线程池的状态。这些可以从代码中看出:

    		private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
    		// 获取当前线程池状态
    		private static int runStateOf(int c)     { return c & ~CAPACITY; }
    		// 获取当前线程池中线程数量
    		private static int workerCountOf(int c)  { return c & CAPACITY; }
    

    到目前为止,我们可以得出线程池可以存放的最大线程数为00011111111111111111111111111111既2^29-1=536870911个线程

    查看线程池的execute(Runnable command)方法的源码:

    public void execute(Runnable command) {
      
      			// 要执行的任务不能是null
            if (command == null)
                throw new NullPointerException();
      			// 获取ctl的值
            int c = ctl.get();
      			// 获取当前线程数
            if (workerCountOf(c) < corePoolSize) {
              // 当前线程数小于核心线程数,直接添加新的线程,结束
                if (addWorker(command, true))
                    return;
              // 再次获取ctl的值,避免在此期间线程的状态被改变,保证数据的准确性
                c = ctl.get();
            }
      			// 当线程走到这里,说明当前线程池核心线程数已经满了
      			// 线程池状态为运行中并且向等待队列插入任务成功
            if (isRunning(c) && workQueue.offer(command)) {
              	// 再次获取ctl值,保证获取最新值
                int recheck = ctl.get();
                // 当前线程池是关闭状态,那么从队列中移除这个任务并拒绝
                if (! isRunning(recheck) && remove(command))
                    reject(command);
              // 如果线程池不是关闭状态或者移除队列任务失败,那么获取当前线程数,如果是0就添加一个空的worker
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
      			// 插入等待队列失败,新建线程处理
            else if (!addWorker(command, false))
              	// 新建任务失败,说明达到最大线程数,调用拒绝策略
                reject(command);
        }
    

    可以看出上面的流程分析与最开始画的线程池工作流程图是一样的,不过在细节上添加了很多判断当前线程池状态的操作。可以看出在代码中多次重新获取ctl值,用来获取当前线程池状态以及当前线程数。那么为什么要用一个值即表示线程池状态又表示线程数呢?

    如果将这两个数拆出来拆成两个值,那么就需要不断的刷新两个数据的值,也就是说上面不断的获取ctl值需要获取两个值了,那么假设这种情况,我获取线程池状态,状态为运行中,然后我获取当前线程数,假设此时线程池关闭了,那么此时是不知道的,就会造成获取到的状态不是实时状态,而使用一个数操作,不仅操作方便,而且也避免了数据实时性的问题。(仅代表个人理解,如果有高见请指出)

    然后我们在看上面代码,是通过addWorker(Runnable command,boolean core)方法来向线程池添加线程的。

    • 参数command:要执行的任务
    • 参数core: 是否是核心线程,因为核心线程是不会销毁的,而不是核心线程,当空闲时间超过存活时间会自动销毁。

    查看addWorker方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
      			// 自旋,保证修改当前线程数一定能成功
            for (;;) {
              // 获取线程池状态,如果线程池已经被关闭,那么就不添加到线程池了
                int c = ctl.get();
                int rs = runStateOf(c);
    
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
                for (;;) {
                  // 如果超过线程数限制返回失败,是核心线程就比较核心线程数,不是核心线程就比较最大线程数
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                  // 线程数+1
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
      			// 走到这里才真正想线程池中添加线程
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
              // 创建一个新的线程,线程池中使用Worker创建线程,并将当前任务作为线程的第一个任务
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                          // 将线程添加到线程池
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                      // 启动线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    上面创建线程池中的线程使用的是Worker来创建线程。那么创建的Worker都存放在线程池,线程池的实现就是:private final HashSet<Worker> workers = new HashSet<Worker>();这个hashSet就是线程池。查看Worker的代码:

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
              // 设置状态为-1表示此worker为新建状态,还未开启线程,不支持中断
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    可以看出Worker也继承自AQS,我们学习AQS时说AQS的核心就是state。在线程池中这个state就代表当前线程是否被使用。而且Worker也实现了Runnable接口,因此它自身就是一个线程任务,他内部的属性Thread在创建时调用了线程工程创建一个新线程。因此说一个Worker就是代表了一个可执行线程。而且这个可执行线程的run方法是:

            public void run() {
                runWorker(this);
            }
    

    查看runWorker方法:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
      // 获取当前线程的第一个任务并将firstTask属性设置为null(也就是取出了worker的第一个任务)
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
              // 如果worker的第一个任务不等于空 或者 从等待队列取到的任务不是空
                while (task != null || (task = getTask()) != null) {
                  // 执行线程的run方法。
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                      //调用钩子函数
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                          //运行run方法
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                          // 调钩子函数 
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
              // 控制线程退出
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    runWorker方法中我们学习到了线程池是如何复用线程的。当一个worker的线程被启动时,会调用runWorker方法,然后内部流程为:

    1. 取出worker的firstTask。
    2. firstTask是否为空,如果为空则跳至4。
    3. 执行firstTask的run方法。
    4. 从等待队列取出task,如果task不为空则跳至2,如果为空则往下。
    5. 控制线程退出。

    流程图如下:

    image-20210122094621712

    如果程序运行过程中出现异常,或者等待队列为空时就要退出异常线程或空闲线程。退出空闲线程的流程为:

    1. 当前线程是否需要退出(异常结束线程直接退出)
    2. 从线程池移除当前线程
    3. 如果当前线程池的线程数小于核心线程数,那么添加一个任务为空的worker

    ExecutorCompletionService

    在我们执行有返回值的任务时,会返回一个Future类型,可以通过future.get()方法获取返回值,但是这个get()方法时阻塞的。因此就可能会出现其他任务已经完成结果,但是我们还在阻塞等待前一个任务的结果。这样算是间接的浪费了时间。查看如下代码:

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
     * 冰箱: 发货 -> 收货 10s
     * 洗衣机: 发货-> 收货 15s
     * 电视: 发货 -> 收货 8s
     * 我们将电器从楼下搬到楼上的时间为3s
     *
     * @author 赵帅
     * @date 2021/1/22
     */
    public class NormalExecutorService {
    
        /**
         * 为了方便使用Executors,实际工作不建议使用
         */
        final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        static void transport(String shop) {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("搬运" + shop);
        }
    
        /**
         * 运输冰箱
         */
        static String transportFridge() {
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("冰箱到了");
            return "冰箱";
        }
    
        /**
         * 运输电视
         */
        static String transportTV() {
    
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("电视到了");
            return "TV";
        }
    
        /**
         * 运输洗衣机
         */
        static String transportWashing() {
    
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("洗衣机到了");
            return "洗衣机";
        }
    
    
        /**
         * 根据上面的题意,如果我们用正常的Future.get()方法去实现的话:
         */
        void way1() throws ExecutionException, InterruptedException {
            
            long start = System.currentTimeMillis();
            Future<String> fridge = executorService.submit(NormalExecutorService::transportFridge);
            Future<String> washing = executorService.submit(NormalExecutorService::transportWashing);
            Future<String> tv = executorService.submit(NormalExecutorService::transportTV);
    
            transport(fridge.get());
            transport(washing.get());
            transport(tv.get());
            System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
            executorService.shutdown();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NormalExecutorService service = new NormalExecutorService();
            service.way1();
        }
    }
    

    根据执行结果可以看出,电视先到了,但是并没有先把电视机搬上楼,而是等待冰箱到了之后才搬运冰箱,然后等待洗衣机到搬运洗衣机,最后才搬运电视。总共耗时21秒。

    或许我们会想调换get的顺序,先搬运电视不就好了。现在是我们知道每个任务的时间,我们或许可以调整,但是在实际工作中,大多数并行情况我们并不能确定执行的时间(网络延迟等影响),因此我们也就无法确定我们应该先等待哪儿个到达。

    正确的处理思路是先到哪儿个先搬哪儿个。ExecutorCompletionService就是这样的一个作用。我们使用ExecutorCompletionService实现上面的过程:

    package com.xiazhi.pool;
    
    import java.util.concurrent.*;
    
    /**
     * 假设我们现在在网上购买了电器: 电视,冰箱,洗衣机。这三个东西是一块发货的,因此可以认为这就是一个异步执行。
     * 冰箱: 发货 -> 收货 10s
     * 洗衣机: 发货-> 收货 15s
     * 电视: 发货 -> 收货 8s
     * 我们将电器从楼下搬到楼上的时间为3s
     *
     * @author 赵帅
     * @date 2021/1/22
     */
    public class ExecutorCompolationServiceDemo {
        /**
         * 为了方便使用Executors,实际工作不建议使用
         */
        final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        static void transport(String shop) {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("搬运" + shop);
        }
    
        /**
         * 运输冰箱
         */
        static String transportFridge() {
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("冰箱到了");
            return "冰箱";
        }
    
        /**
         * 运输电视
         */
        static String transportTV() {
    
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("电视到了");
            return "TV";
        }
    
        /**
         * 运输洗衣机
         */
        static String transportWashing() {
    
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("洗衣机到了");
            return "洗衣机";
        }
    
    
        /**
         * 使用executorCompletionService实现
         */
        void way2() throws ExecutionException, InterruptedException {
            ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    
            long start = System.currentTimeMillis();
            completionService.submit(NormalExecutorService::transportFridge);
            completionService.submit(NormalExecutorService::transportWashing);
            completionService.submit(NormalExecutorService::transportTV);
    
            for (int i = 0; i < 3; i++) {
                transport(completionService.take().get());
            }
    
            System.out.println("从下单到搬运到家共计耗时:" + (System.currentTimeMillis() - start));
            executorService.shutdown();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorCompolationServiceDemo service = new ExecutorCompolationServiceDemo();
            service.way2();
        }
    }
    

    可以看到执行时间在18秒,提高了3秒。

    ForkJoinPool

    forkjoin是指任务的拆分和合并。forkJoinPool会将一个任务拆分成多个任务队列,也就是fork,每一个任务队列是一个线程,当一个任务队列中的任务执行完之后,会从其他任务队列中偷取任务执行,然后执行结果再进行合并也就是join。java8新特性中的并行流就是通过forkJoinPool实现的。

    package com.xiazhi.pool;
    
    import java.util.Arrays;
    
    /**
     * @author 赵帅
     * @date 2021/1/22
     */
    public class ParallelStreamDemo {
        static String[] array = new String[1_000_000];
    
        public static void main(String[] args) {
            for (int i = 0; i < array.length; i++) {
                array[i] = "hello" + i;
            }
    
            Arrays.stream(array).parallel().forEach(f->{
                System.out.println(Thread.currentThread().getName() + ":value:" + f);
            });
        }
    }
    

    观察结果可以看到启动了很多的线程

  • 相关阅读:
    codeblocks 更换颜色主题
    python3 回顾笔记1
    linux查找目录下的所有文件中是否含有某个字符串
    jupyter notebook 远程访问
    ubuntu ufw防火墙
    加载大量的xml数据 使用压缩方法解决(当然较小时也可以压缩)
    lua string介绍
    Lua和C++交互详细总结
    编写高性能的 Lua 代码
    lua中遍历table的几种方式比较
  • 原文地址:https://www.cnblogs.com/Zs-book1/p/14312491.html
Copyright © 2011-2022 走看看