zoukankan      html  css  js  c++  java
  • 【Java多线程】线程池学习

    Java线程池学习

    众所周知,Java不仅提供了线程,也提供了线程池库给我们使用,那么今天来学学线程池的具体使用以及线程池基本实现原理分析。


    ThreadPoolExecutor

    ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,  
                              long keepAliveTime,  
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue,  
                              RejectedExecutionHandler handler) {  
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
             Executors.defaultThreadFactory(), handler);  
    }

    参数解释如下:

    • corePoolSize : 核心池大小,即线程的数量
    • maximumPoolSize : 线程池最大数量
    • keepAliveTime : 线程池中的线程的保活时间,超时后线程会退出直到线程池中的线程数量等于corePoolSize,如果allowCoreThreadTimeout为true,线程数量会降为0
    • workQueue:线程池采用的缓冲队列,常用的有:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
    • handler:当线程池中线程数量达到maximumPoolSize时,仍有任务需要创建线程来完成,则handler采取相应的策略,默认的ThreadPoolExecutor.AbortPolicy(),即会抛弃该任务。

    线程池中可能的几种状态:

    • 当线程数量小于corePoolSize时,任务来时会创建新的线程来处理,并把该线程加入线程队列中(实际上是一个hashset)(此步骤需要获取全局锁,ReentryLock)

    • 如果当前线程数量达到了corePoolSize,任务来时将任务加入BlockingQueue
    • 如果任务列队满了无法加入新的任务时,会创建新的线程(同样需要获取全局锁)
    • 如果线程池数量达到maximumPoolSize,并且任务队列已满,新的任务将被拒绝

    注意:获取全局锁是一个非常影响性能的因素,所以线程池会尽量执行第二步,因为此步骤不需要获取全局锁。

    各种任务队列(BlockingQueue)的区别

    • ArrayBlockingQueue: 基于数组的有界阻塞队列,FIFO

    • LinkedBlockingQueue:基于链表的无界阻塞队列,FIFO,吞吐量高于ArrayBlockingQueue,Executors.newFixedThreadPool()使用了该队列

    • SynchronousQueue:一个不存储元素的队列,一进一出吞吐量高于LinkedBlockingQueue,Executors.newCachedThreadPool()使用了该队列

    • PriorityBlockingQueue:阻塞优先队列

    RejectedExecutionHandler饱和拒绝策略

    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy:只用调用者所在线程来运行任务
    • DiscardOldestPolicy:丢弃队列里最近的任务,并执行当前任务
    • DiscardPolicy:不处理,不丢弃

    提交一个任务

    两种方式:

    threadPool.submit(new Runnable() {
            @Override
            public void run() {
                //
            }
        });
            

    上面是第一种,通过submit方法提交,该方法有返回值,返回类型是Future<?>类型,

    threadPool.execute(new Runnable() {
            @Override
            public void run() {
                //
            }
        }); 

    上面是第二种,通过execute执行,无返回值。

    关闭线程池

    shutdown()和showdownNow()两种方法,都是调用了interruptIdleWorkers()方法去遍历线程池中的工作线程,然后去打断它们,代码如下:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //中断线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    不同的是shutdown是把线程的状态置为SHUTDOWN而shutdownNow是STOP。

    调优

    • 尽量使用有界队列
    • 线程池中的线程数量应该根据场景来合理配置,例如CPU密集型的任务就应该配置尽量少的线程数量

    Executors工具类

    主要介绍工具类Executors创建的三种线程池:

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    ExecutorService executorService1 = Executors.newFixedThreadPool(10);
    ExecutorService executorService2 = Executors.newCachedThreadPool();

    FixedThreadPool

    源码如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    可以看出core和max数量是一样的,因为使用的LinkedBlockingQueue是无界阻塞队列,因此达到coreSize后不会继续创建线程而是阻塞在队列那了。

    SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    只创建一个线程的线程池。

    CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }   

    使用了SynchronousQueue这个没有容量的阻塞队列,线程池的coreSize为0,maxSize为无限大,所以当生产者提供任务的速度大于消费者处理任务的时候,可能会无线创建线程,从而导致系统资源(CPU,内存等)耗竭。

  • 相关阅读:
    【Git】分支管理
    【Java】jprofiler工具初上手
    【中间件】JMS
    【Idea】编译问题Intellij Error: Internal caches are corrupted or have outdated format
    【测试】测试用例选择
    【DevOps】
    【Unix】z/OS系统
    【数据库】表空间释放
    【数据库pojo类】Oracle数据库中各数据类型在Hibernate框架中的映射
    基于闭包实现的显示与隐藏
  • 原文地址:https://www.cnblogs.com/jpfss/p/9059290.html
Copyright © 2011-2022 走看看