zoukankan      html  css  js  c++  java
  • java多线程详解(7)-线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

    如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,

    这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

    这个是时候我们需要使用线程池技术创建多线程。

    本文目录大纲:

    一.Java中的ThreadPoolExecutor类

    二.深入剖析线程池实现原理

    三.使用示例

    四.如何合理配置线程池的大小

     

    一.Java中的ThreadPoolExecutor类

    java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的类,因此如果要深入理解Java中的线程池,

    必须深入理解这个类。我们来看一下ThreadPoolExecutor类的源码。

    ThreadPoolExecutor类中提供了四个构造方法:

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
       
     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

    从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,

    并且前面三个构造器都是调用的第四个构造器进行的初始化工作。

     下面解释下一下构造器中各个参数的含义:

    corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,

    线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()

    或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,

    即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,

    keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,

    如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。

    但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,

    keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    threadFactory:线程工厂,主要用来创建线程;

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

    具体参数的配置与线程池的关系将在下一节讲述。

    从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,看一下AbstractExecutorService的实现:

    handler:表示当拒绝处理任务时的策略,有以下四种取值:

    public abstract class AbstractExecutorService implements ExecutorService {
     
         
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
        public Future<?> submit(Runnable task) {};
        public <T> Future<T> submit(Runnable task, T result) { };
        public <T> Future<T> submit(Callable<T> task) { };
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
        };
    }

    AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

    我们看ExecutorService接口的实现:

    public interface ExecutorService extends Executor {
     
        void shutdown();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
     
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

    public interface Executor {
        void execute(Runnable command);
    }

    我们知道 ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

    Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

    抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

    然后ThreadPoolExecutor继承了类AbstractExecutorService。

    在ThreadPoolExecutor类中有几个非常重要的方法:

    execute()
    submit()
    shutdown()
    shutdownNow()

    execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,

    通过这个方法可以向线程池提交一个任务,交由线程池去执行。

    submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,

    在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,

    它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,

    只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

    shutdown()和shutdownNow()是用来关闭线程池的。

    二.深入剖析线程池实现原理

    在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

    1.线程池状态

    2.任务的执行

    3.线程池中的线程初始化

    4.任务缓存队列及排队策略

    5.任务拒绝策略

    6.线程池的关闭

    7.线程池容量的动态调整

     

    1.线程池状态

    在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

    volatile int runState;
    static final int RUNNING    = 0;
    static final int SHUTDOWN   = 1;
    static final int STOP       = 2;
    static final int TERMINATED = 3;

    runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

    下面的几个static final变量表示runState可能的几个取值。

    当创建线程池后,初始时,线程池处于RUNNING状态;

    如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

    如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

    当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

    2.任务的执行

    在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

    private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
    private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                                  //、runState等)的改变都要使用这个锁
    private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
     
    private volatile long  keepAliveTime;    //线程存货时间   
    private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
    private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
     
    private volatile int   poolSize;       //线程池中当前的线程数
     
    private volatile RejectedExecutionHandler handler; //任务拒绝策略
     
    private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
     
    private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
     
    private long completedTaskCount;   //用来记录已经执行完毕的任务个数

    每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

    corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小

    3.线程池中的线程初始化

    默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

    在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

    prestartCoreThread():初始化一个核心线程;

    prestartAllCoreThreads():初始化所有核心线程

    下面是这2个方法的实现:

    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null); //注意传进去的参数是null
    }
     
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
            ++n;
        return n;
    }

    注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

    r = workQueue.take();

    即等待任务队列中有任务。

    4.任务缓存队列及排队策略

    在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

    workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

    (1).ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

    (2).LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

    (3).synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    5.任务拒绝策略

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    6.线程池的关闭

    ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    7.线程池容量的动态调整

    ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    setCorePoolSize:设置核心池大小

    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

    当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

    三.使用实例

    public class Test {
         public static void main(String[] args) {   
             ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                     new ArrayBlockingQueue<Runnable>(5));
              
             for(int i=0;i<15;i++){
                 MyTask myTask = new MyTask(i);
                 executor.execute(myTask);
                 System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                 executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
             }
             executor.shutdown();
         }
    }
     
     
    class MyTask implements Runnable {
        private int taskNum;
         
        public MyTask(int num) {
            this.taskNum = num;
        }
         
        @Override
        public void run() {
            System.out.println("正在执行task "+taskNum);
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"执行完毕");
        }
    }

    从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。

    如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

    不过在java中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

    Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
    Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
    Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

    下面是这三个静态方法的具体实现;

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    代码如下:

    /**
     * 通过线程池实现多线程
     * 
     * @author cary
     * @version 1.0.0
     */
    public class ThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
            ExecutorService threadPool2 = Executors.newCachedThreadPool();
            ExecutorService threadPool = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int task = i;
                threadPool.execute(new Runnable() {
    
                    @Override
                    public void run() {
                        for (int j = 1; j <= 10; j++) {
                            try {
                                Thread.sleep(20);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println(Thread.currentThread().getName()
                                    + " is looping of " + j + " for  task of "
                                    + task);
                        }
    
                    }
                });
            }
    
            System.out.println("all of 10 tasks have committed! ");
            // threadPool.shutdownNow();
    
            Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {
                public void run() {
                    System.out.println("begining!");
    
                }
            }, 6, 2, TimeUnit.SECONDS);
        }

    从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

    newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

    newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

    newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,

    也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

    实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,

    要根据实际任务的类型和数量来进行配置。

    另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

    四.如何合理配置线程池的大小

    如何合理配置线程池大小,仅供参考。

    一般需要根据任务的类型来配置线程池大小:

    如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

    如果是IO密集型任务,参考值可以设置为2*NCPU

    当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,

    再观察任务运行情况和系统负载、资源利用率来进行适当调整。

    参考资料:

    http://ifeve.com/java-threadpool/

    http://blog.163.com/among_1985/blog/static/275005232012618849266/

    http://developer.51cto.com/art/201203/321885.htm

    http://blog.csdn.net/java2000_wl/article/details/22097059

    http://blog.csdn.net/cutesource/article/details/6061229

    http://blog.csdn.net/xieyuooo/article/details/8718741

    http://www.cnblogs.com/dolphin0520/p/3932921.html

  • 相关阅读:
    无线鼠标换电池了
    Jython Interactive Servlet Console YOU WILL NEVER KNOW IT EXECLLENT!!! GOOD
    Accessing Jython from Java Without Using jythonc
    jython podcast cool isnt't it?
    Python里pycurl使用记录
    Creating an Interactive JRuby Console for the Eclipse Environment
    微软为AJAX和jQuery类库提供CDN服务
    Download A File Using Cygwin and cURL
    What is JMRI?这个是做什么用的,我真没看懂但看着又很强大
    用curl 发送指定的大cookie的http/https request
  • 原文地址:https://www.cnblogs.com/weiguo21/p/4813678.html
Copyright © 2011-2022 走看看