zoukankan      html  css  js  c++  java
  • java-线程池

    一  线程池的概念

            线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,

    从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

    通俗点说就是当有工作来,就会向线程池拿一个线程,当工作完成后,并不是直接关闭线程,而是将这个线程归还给线程池供其他任务使用。

    二  创建线程池

    可通过  ThreadPoolExecutor 创建线程池

    我们看一下该类的构造器

     public ThreadPoolExecutor(int paramInt1, int paramInt2, long paramLong, TimeUnit paramTimeUnit,
                BlockingQueue<Runnable> paramBlockingQueue, ThreadFactory paramThreadFactory,
                RejectedExecutionHandler paramRejectedExecutionHandler) {
            this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
            this.mainLock = new ReentrantLock();
            this.workers = new HashSet();
            this.termination = this.mainLock.newCondition();
            if ((paramInt1 < 0) || (paramInt2 <= 0) || (paramInt2 < paramInt1) || (paramLong < 0L))
                throw new IllegalArgumentException();
            if ((paramBlockingQueue == null) || (paramThreadFactory == null) || (paramRejectedExecutionHandler == null))
                throw new NullPointerException();
    
            this.corePoolSize = paramInt1;
            this.maximumPoolSize = paramInt2;
            this.workQueue = paramBlockingQueue;
            this.keepAliveTime = paramTimeUnit.toNanos(paramLong);
            this.threadFactory = paramThreadFactory;
            this.handler = paramRejectedExecutionHandler;
        }
    对构造器内各参数进行解释

    corePoolSize : 线程池的核心池大小,在创建线程池之后,线程池默认没有任何线程(数量为0)。
    当任务过来时就会去创建线程,直到到达corePoolSize的大小,才会往阻塞队列里插入
    当然如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程

    maximumPoolSize:线程池允许的最大线程数,他表示最大能创建多少个线程。maximumPoolSize肯定是大于等于corePoolSize
    这个值一般在任务队列满了以后,才会用到(任务队列满了后,新任务会创建新线程,直到所有线程数等于
    maximumPoolSize

    keepAliveTime :
    表示线程没有任务时最多保持多久然后停止。默认情况下,只有线程池中线程数大于corePoolSize 时,

    keepAliveTime 才会起作用。
    换句话说,当线程池中的线程数大于corePoolSize,并且一个线程空闲时间达到了keepAliveTime,
    那么就是shutdown(把线程kill掉,移除线程池)


    Unit : keepAliveTime 的单位。

    workQueue :一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数超过它的corePoolSize的时候,
    线程会进入阻塞队列进行阻塞等待。
    通过workQueue,线程池实现了阻塞功能

    threadFactory 
    线程工厂,用来创建线程(也可以自定义线程工厂,指定线程名称格式等)

    handler :
    表示当拒绝处理任务时的策略。


    任务缓存队列

    在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

    workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

    1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

    2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

    3)直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    拒绝策略(handler 的值

    AbortPolicy:丢弃任务并抛出RejectedExecutionException,默认策略

    CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

    DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

    DiscardPolicy:丢弃任务,不做任何处理。

    线程池的任务处理策略:

        如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

    如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,

    若添加成功,则该任务会等待空闲线程将其取出去执行;

    若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

    如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

    
    

        如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;

    如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

    
    

    线程池的关闭

    
    

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    流程图

    当任务提交,当前线程池中线程数量与corePoolSize 比较


    三 4种常见的线程池(都有弊端,不推荐)

    //固定大小线程池
    Executors.newFixedThreadPool(20);
    //创建固定大小线程池
    public static ExecutorService newFixedThreadPool(int var0) {
            return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        }
    特点:

        固定大小的线程池,可以指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值。

    该线程池中的线程数量始终不变,当有新任务提交时,线程池中有空闲线程则会立即执行,如果没有,则会暂存到阻塞队列。对于固定大小的线程池,不存在线程数量的变化。

    同时使用无界的LinkedBlockingQueue来存放执行的任务。当任务提交十分频繁的时候,LinkedBlockingQueue会越来越大

    -------------------------------------     end     -------------------------------------------------------------------------------------


    
    
    //单例线程池
    Executors.newSingleThreadExecutor();
    //创建单例线程池
    public static ExecutorService newSingleThreadExecutor() {
            return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
    new LinkedBlockingQueue())); }
    特点:
      单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,
    则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。
    ------------------------------------- end ----------------------------------------------------------

    //缓存线程池
    Executors.newCachedThreadPool();
    //创建缓存线程池
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
        }
    
    
    特点:
    如果线程池里线程数量超过处理需要,可灵活回收空闲线程,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
    如果线程数不够处理新任务,则自动创建新线程。线程池中缓存的线程默认存活60秒。线程的核心池corePoolSize大小默认为0,
    核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。是一个直接提交的阻塞队列,  
     他总会迫使线程池增加新的线程去执行新的任务。在没有任务执行时,当线程的空闲时间超过keepAliveTime(60秒),则工作线程将会终止被回收,当提交新任务时,如果没有空闲线程,
    则创建新线程执行任务,会导致一定的系统开销。
    如果同时又大量任务被提交,而且任务执行的时间不是特别快,那么线程池便会新增出等量的线程池处理任务,这很可能会很快耗尽系统的资源。
    -------------------------------------     end     ----------------------------------------------------------------------------------------
     
    //定时线程池
    Executors.newScheduledThreadPool(5);
    //创建定时线程池
    public static ScheduledExecutorService newScheduledThreadPool(int var0) {
            return new ScheduledThreadPoolExecutor(var0);
        }
    特点:

        定时线程池,该线程池可用于周期性地去执行任务,通常用于周期性的同步数据。

    scheduleAtFixedRate:是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
    scheduleWithFixedDelay:是以固定的延时去执行任务,延时是指上一次执行成功之后和下一次开始执行的之前的时间。
    -------------------------------------     end     -----------------------------------------------------------------

    结论:以上几种线程池都有缺陷 

    1) FixedThreadPool(固定长度)和 SingleThreadPool(单例)
    允许的请求队列长度为Integer.MAX_VALUE, 可能会堆积大量的请求而导致OOM(内存溢出 OutOfMemery)

    2) CachedThreadPool(缓存线程池) 和 newScheduledThreadPool(定时线程池)
    允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM(内存溢出 OutOfMemery)

    所以《 阿里巴巴开发手册》强制不允许使用Executors 创建线程池,而是用ThreadPoolExecutor创建

    正确的线程池创建方法应该是

        自己指定 各参数的值,特别是 maximumPoolSize 的大小和  blockingQueue的长度防止造成OOM,示例如下

    //线程池的核心池大小
    int corePoolSize = 10;
    //线程池允许的最大线程数
    int maximumPoolSize = 20;
    //表示线程没有任务时最多保持多久然后停止
    long keepAliveTime = 60L;
    //keepAliveTime 的单位
    TimeUnit timeUnit = TimeUnit.SECONDS;
    //任务队列
    BlockingQueue<Runnable> blockingQueue = new LinkedBlockingDeque<>(50);
    ExecutorService excutorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
     timeUnit,blockingQueue);

     

    线程池的调用方式

      //线程池内加入第一个任务
    excutorService.execute(new ThreadPoolTest())
    
    //线程池内加入第二个任务
    excutorService.execute(new ThreadPoolTest())、
    
    //线程池内加入第三个任务
    excutorService.execute(new ThreadPoolTest())
    
    /**
     * 线程池测试类
     * @author hup
     * @since 2019-11-05 22:25
     */
    public class ThreadPoolTest implements Runnable {
        /**
         * 实现run 方法
         */
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName() + "线程被调用了");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

     四 线程任务提交给线程池的方法,submit()和execute()的区别

    1  入参类型不同
    excute()  入参Runnable
    submit() 入参可以为Callable,也可以为Runnable。

    2  是否有返回值  
    submit() 方法,可以提供Future < T > 类型的返回值。
    executor() 方法,无返回值。

    execute()方法源码

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();//抛掉异常
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    View Code

    submit有Future返回值 :

    /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    View Code

    3 是否抛出异常

    excute() 方法会抛出异常。
    sumbit() 方法不会抛出异常。除非你调用Future.get()。

    ------------------------------------------------ end--------------------------------------------------
    
    
    
  • 相关阅读:
    CentOS7上Mongodb安装、卸载
    CentOS7上Redis安装与配置
    vmware centos7系统虚拟机复制到其他电脑后不能联网问题解决
    流程项目点水笔记
    CentOS7图形界面启动报错unable to connect to X server
    本地Chrome测试JS代码报错:XMLHttpRequest cannot load
    iptables相关操作以及简单理解端口和服务之间关系
    git revert 和 git reset的区别
    Linux中_ALIGN宏背后的原理——内存对齐
    SPI协议及其工作原理浅析
  • 原文地址:https://www.cnblogs.com/hup666/p/11802208.html
Copyright © 2011-2022 走看看