zoukankan      html  css  js  c++  java
  • 线程池


    转至元数据结尾

     

    转至元数据起始

     

    前言

        如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

        那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

        在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

    Java中的ThreadPoolExecutor类

        java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

    构造方法

    public class ThreadPoolExecutor extends AbstractExecutorService {
        ........
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
     
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
     
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
        ........
    }

        从上面的源码中我们可以看到,ThreadPoolExecutor是继承于AbstractExecutorService类,并提供了四个构造器,事实上,前三个构造器都是通过第四个构造器进行初始化工作的。

    构造方法参数含义

    corePoolSize:线程池核心线程数大小

        这个参数与后面讲述的线程池工作原理有很大的关系。在创建了线程池之后,默认是没有任何线程的,而是等待有任务到来时才会创建线程去执行任务,除非调用 prestartAllCoreThreads() 或 者 prestartCoreThread() 方法时,会创建全部或者一个核心线程。在默认情况下,当有任务到来时,创建工作线程去执行任务,当线程池中的线程数目到达corePoolSize时,就会把提交的任务放到等待队列中。

    maximumPoolSize:线程池最大线程数

        它标识线程池中最大可创建的多少个线程

    keepAliveTime:空闲线程存活时间

        这个参数只有线程池中的线程数达到corePoolSize时才会起效,直到线程池中的线程数小于等于corePoolSize。如果调用allowCoreThreadTimeOut(boolean value)方法,则当线程池中的线程数小于等于corePoolSize,keepAliveTime参数也会起作用,直到线程池中的线程数为0

    unit:空闲线程存活时间单位

        该参数的值是TimeUnit的7个静态属性:

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

    workQueue:任务等待队列

        一个阻塞队列,用来存储等待执行的任务。这个参数非常重要,会对线程池的运行产生很大的影响。一般来说,阻塞队列有如下几种选择:

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

        ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    threadFactory:线程工厂

        主要用来创建线程

    handler:表示当拒绝处理任务时的策略

        有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    源码解析

        从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

    public abstract class AbstractExecutorService implements ExecutorService {
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
        public Future<?> submit(Runnable task) {};
        public <T> Future<T> submit(Runnable task, T result) { };
        public <T> Future<T> submit(Callable<T> task) { };
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
        };
    }

        AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

    public interface ExecutorService extends Executor {
        // 关闭线程池,队列中已经存在的任务还可以被继续执行
        void shutdown();
        // 关闭线程池,中断正在执行的任务
        List<Runnable> shutdownNow();
        // 判断线程池是否关闭
        boolean isShutdown();
        // 判断线程池是否终止
        boolean isTerminated();
        // 设置超时终止
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        // 提交Callable任务
        <T> Future<T> submit(Callable<T> task);
        // 提交Runnable任务,并带返回值
        <T> Future<T> submit(Runnable task, T result);
        // 提交Runnable任务,不带返回值
        Future<?> submit(Runnable task);
        // invokeAll是同步的,需要等待其他任务完成才会返回结果。而Submit是异步的
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
        // invokeAny是取第一个任务的返回值,并中断其他任务
        <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

        而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

    public interface Executor {
        // 启动任务
        void execute(Runnable command);
    }

        到这里,我们应该可以看到ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

        Executor是一个顶层接口,它只声明了一个execute(Runnable)方法,返回值是void,参数是Runnable,从字面意思可以理解,这个是用来执行传入的线程任务的。

        然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

        抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

        然后ThreadPoolExecutor继承了类AbstractExecutorService。整体类图如下:

        

        在ThreadPoolExecutor类中有几个非常重要的方法:

    // Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行
    execute()
    // ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
    submit()
    // 关闭线程池,会在线程池中的线程执行完成后关闭,执行完该方法后,不再接受新的任务
    shutdown()
    // 立即关闭线程池
    shutdownNow()

    深入剖析线程池的实现原理

    线程池状态

        在ThreadPoolExecutor中定义了几个static final变量表示线程池的各个状态:

    // 线程池使用一个int来存储线程池当前的状态和工作线程数
    // int是4字节,32位,用高三位存储线程池的工作状态,低29位存储线程池的工作线程数
    // 为什么这样?节省空间,一个int可以表述两个含义
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
     
     
    // 创建线程池后,初始时的线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 调用了shutdown()方法后,线程池状态。此时线程池不再接受新任务,它会等待所有任务执行完毕
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 调用了shotdownNow()方法后,线程池状态。此时线程池不再接收新任务,并尝试主动终止正在执行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 临时过渡状态,所有任务都执行完了,当线程池的有效线程数为0时,这个时候为该状态。执行terminated()方法,流转下一个状态
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 终止状态,terminated()方法完成后的状态
    private static final int TERMINATED =  3 << COUNT_BITS;

    任务的执行

        在了解任务提交到线程池到任务执行完成的整个过程前,我们先来看下ThreadPoolExecutor中比较重要的成员变量:

    // 任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;
    // 线程池的主要状态锁,对线程池的状态及属性的变化,都要依赖这个锁来保证同步
    private final ReentrantLock mainLock = new ReentrantLock();
    // 用来存放工作集
    private final HashSet<Worker> workers = new HashSet<Worker>(); 
    // 线程空闲时间
    private volatile long keepAliveTime;
    // 是否允许为核心线程设置存活时间
    private volatile boolean allowCoreThreadTimeOut;  
    // 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   corePoolSize;
    // 线程池最大能容忍的线程数
    private volatile int   maximumPoolSize;
    // 当前线程池的线程数
    private volatile int   poolSize;
    // 线程池的拒绝策略
    private volatile RejectedExecutionHandler handler;
    // 线程工厂,用来创建线程
    private volatile ThreadFactory threadFactory; 
    // 用来记录线程池中曾经出现过的最大线程数
    private int largestPoolSize;
    // 用来记录已经执行完毕的任务个数 
    private long completedTaskCount;

        每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

        corePoolSize在很多地方都被翻译成核心线程数量,但是我的理解是这个值就是线程池的大小。举个例子:假如一个工厂有10个工人,当有人空闲的时候,就分配任务给一个工人。如果10个工人都在忙碌,就把任务置放在等待队列。如果任务繁多,就招临时工进行工作(maximumPoolSize)。当任务增长缓和,就辞退临时工,只保持10个工人。不过为了保持认知的一致性,本文还是继续将corePoolSize解释为核心线程数。

        largestPoolSize只是一个用来记录的变量,和线程池的容量没有关系。

    任务执行的生命周期

        在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可。

    execute()方法

    public void execute(Runnable command) {
        // 判断提交的任务是否为空
        if (command == null)
            throw new NullPointerException();
        // 获取线程池状态和工作线程数量结合体(下文统称为ctl)
        int c = ctl.get();
        // 判断工作线程数量是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 把任务添加Worker,添加成功则返回
            if (addWorker(command, true))
                return;
            // 添加失败,再次获取ctl
            c = ctl.get();
        }
        // 如果线程池的状态是运行中,并且向等待队列提交成功
        if (isRunning(c) && workQueue.offer(command)) {
            // double-check机制,如果线程池已经不是running状态,就要把提交的任务移除并拒绝
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 如果核心线程都超时退出,因为任务已经放入队列,所以不需要再提交一个任务,同时创建一个线程并启动
                addWorker(nullfalse);
        }
        // 执行到这里有两种情况
        // 1. 线程池已经不是RUNNING状态  2.等待队列的长度已经超过定义
        else if (!addWorker(command, false))
            reject(command);
    }

    addWorker()方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池运行状态
            int rs = runStateOf(c);
     
            // Check if queue empty only if necessary.
            // 如果线程池的状态是:STOP TYDING TERMINATD状态,直接返回false,任务添加失败
            // 如果线程池的状态为SHUTDOWN 同时first!=null 或者 workQueue为空,任务添加失败。此时代表:线程池已经停止,正在等待仅有的一个任务执行完成
            // 为什么要做两次判断?有可能任务是在线程池RUNNING状态的时候将任务到队列中,但是放入完成后状态变为SHUTDOWN,此时不应该再执行新的任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
     
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
     
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
     
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • 相关阅读:
    OpenCV教程(46) 快速特征检测
    OpenCV教程(45) harris角的检测(3)
    OpenCV教程(44) harris角的检测(2)
    OpenCV教程(43) harris角的检测(1)
    Andriod源码搜集
    OpenCV特征检测教程
    使用SGD(Stochastic Gradient Descent)进行大规模机器学习
    根据两点经纬度计算距离【转】
    转载]根据两点的经纬度求方位角和距离,等
    array
  • 原文地址:https://www.cnblogs.com/dushenzi/p/13435007.html
Copyright © 2011-2022 走看看