zoukankan      html  css  js  c++  java
  • 线程池

      关于线程池的源码分析,在这里也没认真说明,后面单独起一片文章进行研究。

    1.大纲

      线程池介绍

      创建与停止线程池

      常见的线程池特点与用法

      任务太多,怎么拒绝

      钩子方法

      实现原理,源码分析

      使用线程池的主要点

    一:介绍

    1.重要性

      使用中重要

      面试中重要

    2.池

      线程可以复用

      可以控制资源的总量

    3.不使用线程池些的程序

      这里有两个程序,只粘贴进行循环对每个任务进行创建线程,并执行

    package com.jun.juc.threadpool;
    
    /**
     * for循环执行每一个任务的线程
     * 可以正常的执行,但是有些问题
     * 开销大,反复的操作系统进行创建与销毁
     */
    public class ForLoop {
        public static void main(String[] args) {
            for (int i = 0; i< 10000; i++){
                Thread thread = new Thread(new Task());
                thread.start();
            }
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                System.out.println("执行了任务");
            }
        }
    }
    

      

    4.为什么使用线程池

      反复的创建,开销大

        让一部分的线程保持工作,反复的执行任务

      过多的线程会占用太多的内存

        使用少量的线程

      

    5.线程池的好处

      加快响应速度

      更好的利用CPU,与内存。选择合适的线程数

      统一管理

    6.使用场景

      服务器接收大量的请求

      多个线程的创建

    二:创建与停止线程池

    1.线程池的构造函数的参数

      corePoolSize:核心线程数,int

      maxPoolSize:最大的线程数,int

      keepAliveTime:存活时间,long

      workQueue:任务存储队列,BlockingQueue

      threadFactory:工厂类,ThreadFactory

      Handler:拒绝策略,RejectedExecutionHandler

    2.corePoolSize

      线程池进行初始化的时候,线程池里没有任何的线程,线程池会等待有任务到来的时候,再进行创建新线程执行任务

      

    3.macPoolSize

      线程池有可能子啊核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一定的上限,这个就是最大量

      如果超过了corePoolSize的时候,先将任务放到队列中。

      队列中满了,才会去看

    4.添加线程的规则

      如果线程小于corePoolSize的时候,即使线程有处于空闲状态,也会继续创建新的线程运行新的任务

      如果等于大于corePoolSize,但是小于maxPoolSize,放入队列

      如果队列已满,并且线程小于maxPoolSize,创建新的线程

      

    5.keepAliveTime

      主要看是控制的是谁。

      如果线程池的当前的线程数多于了corePoolSize,那么多于的线程空闲时间超过keepAliveTime,将会被终止

      减少资源消耗

    6.ThreadFactory

      新的线程默认使用Exectors.defaultThreadFactory(),创建的线程都在一个线程组,拥有相同的NORM_PRIORITY优先级,并且都不是守护线程

    7.workQueue

      工作队列

      最常见的队列类型:

        直接交换:SynchronousQueue,内部没有容量

        无界队列:LinkedBlockingQueue

        有界队列:ArrayBlockingQueue

    三:常见的线程池

    1.FixedThreadPool创建

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

      可以看见,corePoolSize与maxPoolSize是相等的

      然后使用的是无界队列。

      由于传递进去的任务是没有容量上限的,可能占用大量的内存,出现OOM

    2.演示溢出

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * FixedThreadPool的使用场景
     */
    public class FixedThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for (int i=0; i<Integer.MAX_VALUE;i++){
                executorService.execute(new Task());
            }
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                try {
                    // 让这个任务执行的很慢,表示队列中会一直增加
                    Thread.sleep(500000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行了任务");
            }
        }
    }
    

      效果:

    D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:63206,suspend=y,server=n -Xmx8m -Xms8m -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144j
    Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    	at com.jun.juc.threadpool.FixedThreadPoolTest.main(FixedThreadPoolTest.java:13)
    

      

    3.SingleThreadExector的使用

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

      说明:

      相比于FixedThreadPool,只是corePoolSize与macPoolSize都是1,其他不变

    4.也会出现OOM

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class SingleThreadExectos {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i=0; i<Integer.MAX_VALUE;i++){
                executorService.execute(new FixedThreadPoolTest.Task());
            }
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                try {
                    // 让这个任务执行的很慢,表示队列中会一直增加
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行了任务");
            }
        }
    }
    

      

    5.CachedThreadPool

      可缓存线程池,无界的线程池,可以自动回收多于线程的功能

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

      其中,最大的线程数没有限制,也是一个大的弊端。如果任务量过大,一样会出现的是OOM

    6.测试

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CachedThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i=0; i< 1000; i++){
                executorService.execute(new Task());
            }
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                try {
                    // 让这个任务执行的很慢,表示队列中会一直增加
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread()+"执行了任务");
            }
        }
    
    
    }
    

      效果:

    Thread[pool-1-thread-495,5,main]执行了任务
    Thread[pool-1-thread-491,5,main]执行了任务
    Thread[pool-1-thread-511,5,main]执行了任务
    Thread[pool-1-thread-508,5,main]执行了任务
    Thread[pool-1-thread-507,5,main]执行了任务
    Thread[pool-1-thread-499,5,main]执行了任务
    Thread[pool-1-thread-519,5,main]执行了任务
    Thread[pool-1-thread-522,5,main]执行了任务
    Thread[pool-1-thread-518,5,main]执行了任务
    Thread[pool-1-thread-510,5,main]执行了任务
    

      创建了很多的线程

    7.ScheduledThreadPool

      支持定时与周期性的执行的线程池

      核心线程是传递过去的,但是最大的核心线程数是INTEGER.MAX_VALUE

    8.延迟一定时间之后运行

      定时

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ScheledThreadPoolTest {
        public static void main(String[] args) {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS);
            
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                try {
                    // 让这个任务执行的很慢,表示队列中会一直增加
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread()+"执行了任务");
            }
        }
    
    
    }
    

      

    9.周期性的运行

      周期

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ScheledThreadPoolTest {
        public static void main(String[] args) {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.schedule(new Task(),6, TimeUnit.SECONDS);
            // 周期的运行
            scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
        }
    
        static class Task implements Runnable{
            @Override
            public void run() {
                try {
                    // 让这个任务执行的很慢,表示队列中会一直增加
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread()+"执行了任务");
            }
        }
    
    
    }
    

      

    10.总结

      

       主要注意的是CacheDThreadPool与ScheduleThreadPool的对比。

      CachedThreadPool为啥使用SynchronousQueue,因为有任务不需要进行存储,直接交给线程执行就行了。

      ScheduledThreadPool使用的是延迟队列DelayedWorkQueue

     

    四:线程池中的线程数量设定

    1.计算密集型的

      为cpu核心数的1~2倍

    2.耗时IO型的

      最佳线程数一般大于cpu很多倍。

      以jvm线程监控显示繁忙情况为依据,参考brain goetz推荐的计算方法

    3.计算方法

      cpu核心数 * (1+平均等待时间/平均工作时间)

    五:停止线程池

    1.shutdown

      要线程中,会队列中的线程任务都执行完成后,再进行停止

      对拒绝新的任务

    2.测试

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * shutdown关闭
     */
    public class ShutDown {
        public static void main(String[] args) throws Exception{
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i=0; i< 1000; i++){
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(2000);
            executorService.shutdown();
            executorService.execute(new ShutDownTask());
        }
    
        static class ShutDownTask implements Runnable {
            @Override
            public void run(){
                try {
                    Thread.sleep(50);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

      效果:

      可以发现,在执行一段时间后,就可以发现,真的不再进行接收任务了

    pool-1-thread-1
    pool-1-thread-1
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jun.juc.threadpool.ShutDown$ShutDownTask@ed17bee rejected from java.util.concurrent.ThreadPoolExecutor@2a33fae0[Shutting down, pool size = 1, active threads = 1, queued tasks = 960, completed tasks = 39]
    	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    	at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
    	at com.jun.juc.threadpool.ShutDown.main(ShutDown.java:17)
    pool-1-thread-1
    pool-1-thread-1
    

      

    3.isShutdown

      可以知道线程被停止过了

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * shutdown关闭
     */
    public class ShutDown {
        public static void main(String[] args) throws Exception{
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i=0; i< 100; i++){
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(2000);
            System.out.println(executorService.isShutdown());
            executorService.shutdown();
            System.out.println(executorService.isShutdown());
            executorService.execute(new ShutDownTask());
        }
    
        static class ShutDownTask implements Runnable {
            @Override
            public void run(){
                try {
                    Thread.sleep(50);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

      效果:

       先false,然后true

    4.isTerminated

      返回是否真正的结束

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * shutdown关闭
     */
    public class ShutDown {
        public static void main(String[] args) throws Exception{
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i=0; i< 10; i++){
                executorService.execute(new ShutDownTask());
            }
            System.out.println(executorService.isShutdown());
            executorService.shutdown();
            System.out.println(executorService.isShutdown());
            Thread.sleep(2000);
            System.out.println(executorService.isTerminated());
        }
    
        static class ShutDownTask implements Runnable {
            @Override
            public void run(){
                try {
                    Thread.sleep(50);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

        效果:

    D:jdk1.8.0_144injava -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:62572,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "D:jdk1.8.0_144jrelibcharsets.jar;D:jdk1.8.0_144jrelibdeploy.jarConnected to the target VM, addre
    true
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    pool-1-thread-1
    Disconnected from the target VM, address: '127.0.0.1:62572', transport: 'socket'
    true
    
    Process finished with exit code 0
    

      

     5.awaitTermination

      所有的任务都执行完毕,等待的时间到了,等待过程中被打断都会返回,否则阻塞。

      说明:8秒内,如果关闭了线程,并且都执行完成返回true,否则是false

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * shutdown关闭
     */
    public class ShutDown {
        public static void main(String[] args) throws Exception{
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i=0; i< 10; i++){
                executorService.execute(new ShutDownTask());
            }
            executorService.shutdown();
            boolean b = executorService.awaitTermination(8L, TimeUnit.SECONDS);
            System.out.println("b="+b);
        }
    
        static class ShutDownTask implements Runnable {
            @Override
            public void run(){
                try {
                    Thread.sleep(50);
                    System.out.println(Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

      

     6.shutdownNow

      立刻关闭线程

      存在返回未执行的任务。

    package com.jun.juc.threadpool;

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    /**
    * shutdown关闭
    */
    public class ShutDown {
    public static void main(String[] args) throws Exception{
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    for (int i=0; i< 10; i++){
    executorService.execute(new ShutDownTask());
    }
    // shutdownNow
    List<Runnable> runnables = executorService.shutdownNow();
    ExecutorService executorService2 = Executors.newSingleThreadExecutor();
    runnables.forEach(item -> executorService2.execute(item));
    }

    static class ShutDownTask implements Runnable {
    @Override
    public void run(){
    try {
    Thread.sleep(50);
    System.out.println(Thread.currentThread().getName());
    } catch (InterruptedException e) {
    System.out.println("线程终端了");
    }
    }
    }
    }

      

    六:拒绝策略

     1.拒绝时机

      Executor关闭时

      最大线程和队列已满

    2.拒绝策略

      AbsortPolicy:直接抛出异常

      DiscradPolicy:默默丢弃

      DiscardOldestPolicy:丢弃最老的任务

      CallerRunsPolicy:谁提交的任务,则有谁进行运行,这样可以降低提交速度

    七:钩子方法

    1.说明

      在任务的前后

      日志,统计

    2.暂停线程池

    package com.jun.juc.threadpool;
    
    import java.util.concurrent.*;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class PauseableThreadPool extends ThreadPoolExecutor {
        /**
         * 并发加锁
         */
        private final ReentrantLock lock = new ReentrantLock();
    
        private Condition unpaused = lock.newCondition();
    
        /**
         * 是否暂停
         */
        private boolean isPaused;
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /**
         * 执行之前,暂停
         *
         * @param t
         * @param r
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            lock.lock();
            try {
                while (isPaused) {
                    unpaused.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    
        /**
         * 暂停
         */
        private void pause() {
            lock.lock();
            try {
                isPaused = true;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 恢复
         */
        public void resume(){
            lock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
            Runnable task = new Runnable(){
                @Override
                public void run() {
                    System.out.println("开始执行了");
                    try {
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            };
            for(int i=0; i<1000; i++){
                pauseableThreadPool.execute(task);
            }
            Thread.sleep(1000);
            pauseableThreadPool.pause();
            System.out.println("线程池被暂停了");
            Thread.sleep(1000);
            pauseableThreadPool.resume();
            System.out.println("线程池被再次执行了");
        }
    
    }

      效果:

    开始执行了
    开始执行了
    线程池被暂停了
    线程池被再次执行了
    开始执行了
    开始执行了

    八:源码

    1.组成部分

      线程池管理器

      工作线程

      任务队列

      任务接口

    2.Exector家族

      

    3.Exector

      顶层接口,只有一个方法

     * @since 1.5
     * @author Doug Lea
     */
    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

      

    4.ExecuorService

      继承了Excetor,然后增加了几个新的方法

      初步的有了管理线程池的方法

      

    5.Excetors  

      这是一个工具类

      进入可以发现是使用ThreadPoolExector进行创建的线程

    6.线程池实现任务复用的原理

      execute:

      

      添加到worker

      

      

      進行运行:

      

    九:线程池状态

    1.线程池状态

      Running:接收新任务并处理排队任务

      SHUTDOWN:不接受新任务,但是处理排队任务

      stop:不接受新任务,也不处理排队任务,并中断正在进行的任务

      tidying:所有的任务都已经终结,workerCount为零时,线程就会转为这个状态,并且运行terminate()方法

      TERMIMATED:运行完成

    2.状态值

       

      

      

      

        

  • 相关阅读:
    node.js是什么
    python基础 filter ,列表,字典,集合 中根据 条件 筛选 数据
    nginx 自动补全www,当不输入www时候自动补全www
    python爬虫,接口是post请求,参数是request payload 的形式,如何传参
    python使用with开启线程锁
    linux nohup后台执行脚本并指定文件输出 ,nohup 修改默认日志输出文件
    python线程锁
    nginx yum安装启动
    redis desktop manager 远程连接服务器上的redis
    职位列表中英对照
  • 原文地址:https://www.cnblogs.com/juncaoit/p/12866361.html
Copyright © 2011-2022 走看看