zoukankan      html  css  js  c++  java
  • 功能:Java线程池

    Java线程池

    一、介绍

    线程池,顾名思义,这是管理一堆线程而出现的对象。与数据库的连接池一致,它的出现解决了线程的频繁创建和销毁,从而浪费大量资源的问题。

    所以,线程池中有提前创建好的线程,使用时直接分配获取,使用完再由线程池管理是否销毁。

    优点

    • 降低资源消耗,也就是不需要重复多次的创建线程
    • 更好的管理线程
      • 比如可以获取当前运行的线程是什么
      • 还在等待执行的任务有什么

    二、使用线程池

    在JDK5起提供了线程池的对象,ExecutorServiceExecutors

    其中,ExecutorService和它的子类ThreadPoolExecutor是线程池的关键

    Executors是对应的工具类,里面有些工厂方法可以快速创建线程池

    查看ThreadPoolExecutor的构造方法

    public class ThreadPoolExecutor extends AbstractExecutorService {
        
    	public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {}
    }
    
    参数 说明
    corePoolSize 核心线程数。就算目前空闲,也不会回收这几个线程
    maximumPoolSize 最大线程数。当前线程池可以容纳的最大线程数量
    keepAliveTime 线程保持存活时间。如果线程空间时间到达,将会进行销毁(除了核心线程)
    unit keepAliveTime一起使用,仅是个时间单位
    workQueue 工作等待队列。当线程池所有的线程都繁忙运行时,新添加的执行任务会暂时保留至此队列
    threadFactory 创建线程的线程工厂
    handler 拒绝策略。当队列满了后,还有执行任务进入时的策略

    workQueue参数需要传入一个BlockingQueue,这是个阻塞队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率,不能传入空对象,可设置容量大小,也可以不设置容量大小,那么它的容量就是Integer.MAX_VALUE。常用的几种实现类

    说明
    ArrayBlockingQueue 规定容量大小的阻塞队列
    LinkedBlockingQueue 既可以规定容量大小,也可以不规定的阻塞队列
    SynchronizedQueue 一个特殊的队列,生产消费必须交替完成的队列
    生产一个元素后,必须要有进行消费后,才能继续往队列内生产元素

    handler拒绝策略

    当线程池指定的队列容量满了时,将执行哪种拒绝任务的策略

    策略类 说明
    AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
    DiscardPolicy 不执行新任务,也不抛出异常
    DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
    CallerRunsPolicy 直接在外层调用者的线程中调用新任务

    1)小试牛刀

    package com.banmoon.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo1 {
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
    
            executorService.execute(new MyRunnable());
            executorService.execute(new MyRunnable());
            executorService.execute(new MyRunnable());
            executorService.execute(new MyRunnable());
            executorService.execute(new MyRunnable());
            // lambda表达式
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName());
            });
            // 关闭线程池,如果不关闭,线程池将一直存在,池子内保留着核心线程,等待着调用
            executorService.shutdown();
        }
    
    }
    
    class MyRunnable implements Runnable{
    
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }
    

    image-20211211195656264

    2)Executors工具类

    关于此的三个相关方法源码,其中还有一些他们的重载,这边就不细细讲了。

    这些工具类方法,主要是快速创建ThreadPoolExecutor对象的方法,只是它们的参数各有所不同

    public class Executors {
        
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    }
    
    方法 参数说明 效果
    newCachedThreadPool 核心线程数为0
    最大线程数已调到Integer.MAX_VALUE
    每提交一个线程任务,都将新创建一个新的线程来执行
    如果需要执行的任务很多,这有可能会导致CPU100%的问题
    newFixedThreadPool 核心线程数和最大线程数一致
    但队列长度为Integer.MAX_VALUE
    提交的任务将正常交给池子中的线程执行,执行完成也不会销毁,等待执行新的任务
    如果执行的任务很多,队列会一直添加任务等待执行,可能会造成内存溢出的问题
    newSingleThreadExecutor 核心线程数和最大线程数都为1
    但队列长度为Integer.MAX_VALUE
    newFixedThreadPool类似,但池子中只有一个线程

    根据需要来进行使用合适的线程池,测试下他们的执行方式和快慢

    package com.banmoon.pool;
    
    import cn.hutool.core.date.DateUtil;
    import cn.hutool.core.util.StrUtil;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo2 {
    
        public static void main(String[] args) {
            ExecutorService executorService1 = Executors.newCachedThreadPool();
            ExecutorService executorService2 = Executors.newFixedThreadPool(10);
            ExecutorService executorService3 = Executors.newSingleThreadExecutor();
    
            for (int i = 0; i < 100; i++) {
                executorService1.execute(new MyDemo2(i));
    //            executorService2.execute(new MyDemo2(i));
    //            executorService3.execute(new MyDemo2(i));
            }
    
            executorService1.shutdown();
            executorService2.shutdown();
            executorService3.shutdown();
    
        }
    
    }
    
    class MyDemo2 implements Runnable {
    
        private Integer i;
    
        public MyDemo2(Integer i) {
            this.i = i;
        }
    
        @Override
        public void run() {
            System.out.println(StrUtil.format("{}:{},时间:{}", Thread.currentThread().getName(), i, DateUtil.now()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    newCachedThreadPool执行结果可以看到,一共有100个线程被创建出来

    image-20211212150430276

    newFixedThreadPool执行结果,执行的永远都是那几个固定的线程,这里我们指定了10个线程,所以打印也是10个为一批来进行的。

    image-20211212150922546

    newSingleThreadExecutor执行结果,从头到尾就只有一个线程在执行

    image-20211212154044131

    3)线程工厂

    虽然有默认的线程工厂,但如果有需要进行处理的话,还是得记录一下

    package com.banmoon.pool;
    
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Demo3 {
    
        public static void main(String[] args) {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
            for (int i = 0; i < 100; i++) {
                executor.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            executor.shutdown();
        }
    
    }
    
    
    class MyThreadFactory implements ThreadFactory{
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private String poolName;
    
        MyThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            this.poolName = poolName;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            String threadName = poolName + "-" + threadNumber.getAndIncrement();
            Thread t = new Thread(group, r, threadName, 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    
    }
    

    执行结果

    image-20211212163007164

    4)拒绝策略

    拒绝策略没什么好讲的,平常在使用时,注意下容量的大小,以及使用的策略。自己需要执行的任务数量多少,会不会照成内存溢出等,从这几个方面入手,选择最适合业务的队列容量和拒绝策略。

    策略类 说明
    AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
    DiscardPolicy 不执行新任务,也不抛出异常
    DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
    CallerRunsPolicy 直接在外层调用者的线程中调用新任务

    演示CallerRunsPolicy,会在调用者的线程中,执行超出容量的任务

    package com.banmoon.pool;
    
    import java.util.concurrent.*;
    
    public class Demo4 {
    
        public static void main(String[] args) {
            ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
            for (int i = 1; i <= 100; i++) {
                executorService.execute(new MyDemo4(i));
            }
            executorService.shutdown();
        }
    
    }
    
    class MyDemo4 implements Runnable{
    
        private Integer i;
    
        public MyDemo4(Integer i) {
            this.i = i;
        }
    
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ":" + i);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    执行结果,上述线程池指定了最大线程数为20,队列容量为20。所以当执行第41个任务时,队列满了,将由调用者的线程来执行这个任务,此处是主线程

    image-20211212164523094

    三、其他

    1)执行任务的优先级

    public class ThreadPoolExecutor extends AbstractExecutorService {
        
        public void execute(Runnable command) {
            // 判断是否为空
            if (command == null)
                throw new NullPointerException();
            
            // 判断当前正在运行的线程数是否小于核心线程数
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                // 添加任务至线程执行,成功添加则结束
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 如果核心线程都有在运行,将任务放至队列中
            if (isRunning(c) && workQueue.offer(command)) {
                // 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 如果队列推入任务失败了,那将直接添加至线程执行
            else if (!addWorker(command, false))
                // 如果任务添加至线程失败,则将进行拒绝策略
                reject(command);
        }
        
        /**
         * 会从线程工厂获取线程,并添加执行任务
         * @param firstTask 执行的任务
         * @param core 是否可以添加至核心线程
         * @return true:成功添加至线程执行
         */
        private boolean addWorker(Runnable firstTask, boolean core) {
            // ...
        }
    }
    

    image-20211212183817010

    四、最后

    线程池这东西干货还是挺多的,还有挺多没有整理完。比如说addWorker方法,线程池的执行调度等

    后续有什么新的理解继续补上,未完待续

    欢迎来登录我的个人博客

    入我相思门,知我相思苦, 长相思兮长相忆,短相思兮无穷极, 早知如此绊人心,何如当初莫相识。
  • 相关阅读:
    python编程学习进度七
    python编程学习进度六
    SOA——2020.5.15
    代码大全001/
    Refined Architecture阶段——细化架构
    架构即未来003(摘自网络)
    我对外包公司的小小看法
    架构即未来002
    每日日报
    架构即未来阅读笔记001
  • 原文地址:https://www.cnblogs.com/banmoon/p/15727037.html
Copyright © 2011-2022 走看看