zoukankan      html  css  js  c++  java
  • Java并发编程之深入理解线程池原理及实现

    Java线程池在实际的应用开发中十分广泛。虽然Java1.5之后在JUC包中提供了内置线程池可以拿来就用,但是这之前仍有许多老的应用和系统是需要程序员自己开发的。因此,基于线程池的需求背景、技术要求了解线程池原理和实现,一方面可以更为深刻理解Java多线程开发,有助于解决业务系统中因为线程问题所产生的bug;另一方面也有利于对遗留代码的重构。

    如果需要先行了解Java并发编程的基础知识,可参考以下随笔:

    1. Java并发编程之线程创建和启动(Thread、Runnable、Callable和Future)

    2. Java并发编程之线程生命周期、守护线程、优先级、关闭和join、sleep、yield、interrupt

    3. Java并发编程之线程安全、线程通信

    4. Java并发编程之ThreadGroup

    线程池原理

    所谓的线程池,跟JDBC连接池、代理池等一样,属于一种“池”的设计模式。在设计好的数据结构中存放一定数量的线程,并且根据任务需要自动调整线程数量的多少,直到峰值。具体说来,线程池需要满足若干条件:

    1. 任务队列:用于缓存提交的任务

    2. QueueSize:任务队列存放的Runnable任务实例的数量,需要有限制值防止内存溢出。

    3. 线程数量管理:创建线程时初始的数量init;线程池自动扩充时最大的线程数量max;空闲时的活跃线程或核心线程数量core。三者满足init<=core<=max

    4. 工作线程队列:用于存储工作线程,并统计工作线程的数量。

    5. 任务拒绝策略:如果线程数量达到上限且任务队列已满,需要有拒绝策略通知任务提交者,这个在工程实践中非常重要。

    6. 线程工厂:用于个性化定制线程,如设置守护线程、线程名称等。

    7. Keepedalive时间:线程各个重要参数自动维护的时间间隔。

    线程池原理图如下:

    线程池实现

    “模块设计,类图先行”。明确了线程池需要实现的功能之后,就可以画出线程池的草图了,核心接口及实现类如下:

    ThreadPool接口

    ThreadPool接口主要定义一个线程池的基本属性,如任务提交、初始容量、最大容量、核心容量等。实现代码如下:

     1 public interface ThreadPool {
     2     
     3     //submit tasks to thread pool
     4     void execute(Runnable runnable);
     5     //close pool
     6     void shutdown();
     7     //get the initial size of pool
     8     int getInitSize();
     9     //get the max size of pool
    10     int getMaxSize();
    11     //get the core size of pool
    12     int getCoreSize();
    13     //get the cache tasks queue of pool
    14     int getQueueSize();
    15     //get the active thread volume of pool
    16     int getActiveCount();
    17     //check if pool has been shutdown
    18     boolean isShutdown();
    19 }

    RunnableQueue接口

    这个接口的作用与BlockingQueue接口一样,用于存储提交的Runnable实现类任务。

    1 public interface RunnableQueue {
    2     //提交任务到队列
    3     void offer(Runnable runnable);
    4     //从队列中获取任务
    5     Runnable take() throws InterruptedException;
    6     //返回队列中任务数
    7     int size();
    8 }

    ThreadFactory接口

    定义了个性化创建线程的工厂方法

    1 @FunctionalInterface
    2 public interface ThreadFactory {
    3     
    4     Thread createThread(Runnable runnable);
    5 
    6 }

    DenyPolicy接口

    定义了线程池的拒绝策略,即当任务队列达到上限时,采取何种措施拒绝。注意接口内定义了内部类作为外围接口的实现类(该类自动为public和static,像这种嵌套类的实现,可查询《Java编程思想》)。

     1 @FunctionalInterface
     2 public interface DenyPolicy {
     3     
     4     void reject(Runnable runnable, ThreadPool threadPool);
     5     //定义嵌套类作为拒绝策略的实现类
     6     //1.拒绝并丢弃任务
     7     class DiscardDenyPolicy implements DenyPolicy{
     8 
     9         @Override
    10         public void reject(Runnable runnable, ThreadPool threadPool) {
    11             
    12         }    
    13     }
    14     
    15     //2.拒绝并抛出自定义异常
    16     class AbortDenyPolicy implements DenyPolicy{
    17 
    18         @Override
    19         public void reject(Runnable runnable, ThreadPool threadPool) {
    20             throw new RunnableDenyException("The runnable " + runnable + " will abort.");
    21         }        
    22     }
    23     
    24     //3.拒绝, 使用提交者所在线程来完成线程任务.
    25     class RunnerDenyPolicy implements DenyPolicy{
    26 
    27         @Override
    28         public void reject(Runnable runnable, ThreadPool threadPool) {
    29 
    30             if(!threadPool.isShutdown()) {
    31                 runnable.run();
    32             }
    33         }    
    34     }
    35 }

    其实实现了自定义异常类RunnableDenyException:

    1 public class RunnableDenyException extends RuntimeException {
    2     
    3     private static final long serialVersionUID = 112311231231412L;
    4 
    5     public RunnableDenyException(String message) {
    6         super(message);
    7     }
    8 }

    InternalTask实现类

    Runnable的实现类,会使用到RunnableQueue,它的作用其实是封装了一个任务实例,把Runnable任务的run方法封装到自己的Run方法实现中,并且提供了一个stop方法,用于在线程池销毁或数量维护时停止当前线程。

     1 public class InternalTask implements Runnable {
     2     //组合一个RunnableQueue的引用
     3     private final RunnableQueue runnableQueue;
     4     //使用volatile关键字修饰开关变量
     5     private volatile boolean running = true;
     6     
     7     public InternalTask(RunnableQueue runnableQueue) {
     8         this.runnableQueue = runnableQueue;
     9     }
    10     @Override
    11     public void run() {
    12         // if current task match "both running and isInterrupt" are true
    13         // continue to take runnable from queue and run
    14         while(running && !Thread.currentThread().isInterrupted()) {
    15             try {
    16                 Runnable task = runnableQueue.take();
    17                 task.run();
    18             } catch (Exception e) {
    19                 running = false;
    20                 break;
    21             }
    22         }
    23 
    24     }
    25     //停止线程的开关方法
    26     public void stop() {
    27         this.running = false;
    28     }
    29 }

    到这里,一个基本线程池的骨架就搭好了,接下来主要是实现各接口,实现具体的方法。

    1. 队列的实现类LinkedRunnableQueue

     1 public class LinkedRunnableQueue implements RunnableQueue {
     2     //设置队列上限
     3     private final int limit;
     4     //设置拒绝策略的引用
     5     private final DenyPolicy denyPolicy;
     6     //使用LinkedList作为队列的具体实现类
     7     private final LinkedList<Runnable> runnableList = new LinkedList<>();
     8     //设置线程池的引用
     9     private final ThreadPool threadPool;
    10     //构造方法时赋初始值
    11     public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
    12         this.limit = limit;
    13         this.denyPolicy = denyPolicy;
    14         this.threadPool = threadPool;
    15     }
    16 
    17     @Override
    18     public void offer(Runnable runnable) {
    19         //使用同步锁, 确保入队的线程安全
    20         synchronized (runnableList) {
    21             //当达到队列上限, 调用拒绝策略;否则加入队尾, 并唤醒阻塞中的线程.
    22             if(runnableList.size() >= limit) {
    23                 denyPolicy.reject(runnable, threadPool);
    24             }else {
    25                 runnableList.addLast(runnable);
    26                 runnableList.notifyAll();
    27             }
    28         }
    29     }
    30 
    31     @Override
    32     public Runnable take() throws InterruptedException {
    33         synchronized (runnableList) {
    34             
    35             while(runnableList.isEmpty()) {
    36                 try {
    37                     //如果队列中没有可执行任务, 线程挂起, 进入runnableList关联的monitor waitset中等待被唤醒
    38                     runnableList.wait();
    39                 } catch (InterruptedException e) {
    40                     //如果被中断, 需要抛出异常
    41                     throw e;
    42                 }
    43             }
    44             return runnableList.removeFirst();
    45         }
    46     }
    47 
    48     @Override
    49     public int size() {
    50         synchronized (runnableList) {
    51             //返回队列中的任务数量
    52             return runnableList.size();
    53         }
    54     }
    55 }

    2. 线程工厂的实现

     1 public class DefaultThreadFactory implements ThreadFactory {
     2     //定义原子类的Integer作为线程组的计数
     3     private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
     4     //定义线程组对象
     5     private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"+ GROUP_COUNTER.getAndDecrement());
     6     //定义生产的线程计数
     7     private static final AtomicInteger COUNTER = new AtomicInteger(0);
     8 
     9     @Override
    10     public Thread createThread(Runnable runnable) {
    11         return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());
    12     }
    13 }

    3. 线程池的实现

    线程池的实现相对比较复杂, 运用了多种设计模式的思想,核心的要点包括:

    1. 使用私有内部类的方式来复用Thread类,防止向外暴露Thread类的方法;

    2. 核心组成部分主要是LinkedList实现的任务队列和ArrayDeque实现的工作线程队列,构成了主要的存储主体。

    3. 核心的扩容机制需要RunnableQueue + InternalTask + ThreadFactory的结合, 简单说来就是通过判定任务数是否达到阈值,然后增加工作线程的数量。

      1 public class BasicThreadPool implements ThreadPool {
      2     //为了不暴露Thread类的方法, 使用私有内部类WorkThread来继承Thread类
      3     private WorkThread workThread;
      4     //线程池的基本属性
      5     private final int initSize;
      6     private final int maxSize;
      7     private final int coreSize;
      8     private int activeCount;
      9     //线程工厂引用
     10     private final ThreadFactory threadFactory;
     11     //队列引用
     12     private final RunnableQueue runnableQueue;
     13     //线程池销毁标识
     14     private volatile boolean isShutdown = false;
     15     //工作线程的队列, 使用ArrayDeque实现
     16     private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
     17     //定义了一个默认的拒绝策略
     18     private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
     19     //定义了一个默认的工厂对象
     20     private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
     21     
     22     private final long keepAliveTime;
     23     private final TimeUnit timeUnit;
     24     //默认的构造器, 只需要传入初始容量, 最大容量, 核心容量和队列上限
     25     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
     26         this(initSize, maxSize, coreSize, queueSize, DEFAULT_THREAD_FACTORY, 
     27                 DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
     28     }
     29     //完整构造器
     30     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory,
     31             DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) {
     32         this.workThread = new WorkThread();
     33         this.initSize = initSize;
     34         this.maxSize = maxSize;
     35         this.coreSize = coreSize;
     36         this.threadFactory = threadFactory;
     37         this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
     38         this.keepAliveTime = keepAliveTime;
     39         this.timeUnit = timeUnit;
     40         this.init();
     41     }
     42     //线程池的初始化方法, 在构造器中被调用, 用于启动工作线程
     43     private void init() {
     44         workThread.start();
     45         for(int i = 0; i < initSize; i++) {
     46             newThread();
     47         }
     48     }
     49     //封装了工作线程的启动方法: 
     50     //1. 使用InternalTask封装RunnableQueue对象
     51     //2. 通过工厂方法制造工作线程并启动
     52     //3. 工作线程入队, 工作线程队列计数器+1
     53     private void newThread() {
     54         InternalTask internalTask = new InternalTask(runnableQueue);
     55         Thread thread = this.threadFactory.createThread(internalTask);
     56         ThreadTask threadTask = new ThreadTask(thread, internalTask);
     57         threadQueue.offer(threadTask);
     58         this.activeCount++;
     59         thread.start();
     60     }
     61     //工作线程出队的方法
     62     private void removeThread() {
     63         ThreadTask threadTask = threadQueue.remove();
     64         threadTask.internalTask.stop();
     65         this.activeCount--;
     66     }
     67     //核心:通过内部类继承Thread方法, 设计了自动扩容的机制.
     68     //为了防止过快增加到Max容量, 使用continue来退出循环
     69     private class WorkThread extends Thread{
     70         @Override
     71         public void run() {
     72             while(!isShutdown && !isInterrupted()) {
     73                 try {
     74                     timeUnit.sleep(keepAliveTime);
     75                 } catch (InterruptedException e) {
     76                     isShutdown = true;
     77                     break;
     78                 }
     79                 synchronized (this) {
     80                     if(isShutdown) {
     81                         break;
     82                     }
     83                     if(runnableQueue.size() > 0 && activeCount < coreSize) {
     84                         for(int i = initSize; i<coreSize;i++) {
     85                             newThread();
     86                         }
     87                         continue;
     88                     }
     89                     if(runnableQueue.size() > 0 && activeCount < maxSize) {
     90                         for(int i = coreSize; i<maxSize;i++) {
     91                             newThread();
     92                         }
     93                     }
     94                     if(runnableQueue.size()==0 && activeCount > coreSize) {
     95                         for(int i = coreSize; i < activeCount; i++) {
     96                             removeThread();
     97                         }
     98                     }
     99                     
    100                 }
    101             }
    102         }
    103     }
    104 
    105     @Override
    106     public void execute(Runnable runnable) {
    107         //如果线程池已经销毁, 将抛出异常
    108         if(this.isShutdown) {
    109             throw new IllegalStateException("the thread pool is destoried");
    110         }
    111         this.runnableQueue.offer(runnable);    
    112     }
    113 
    114     @Override
    115     public void shutdown() {
    116         synchronized(this) {
    117             //防止重复销毁
    118             if(isShutdown) {
    119                 return;
    120             }
    121             //重置关闭标识
    122             isShutdown = true;
    123             //关闭任务工作线程
    124             threadQueue.forEach(threadTask -> {
    125                 threadTask.internalTask.stop();
    126                 threadTask.thread.interrupt();
    127             });
    128             //关闭线程池的工作线程
    129             this.workThread.interrupt();
    130         }
    131     }
    132 
    133     @Override
    134     public int getInitSize() {
    135         if(isShutdown) {
    136             throw new IllegalStateException("The thread pool is destroy");
    137         }
    138         return this.initSize;
    139     }
    140 
    141     @Override
    142     public int getMaxSize() {
    143         if(isShutdown) {
    144             throw new IllegalStateException("The thread pool is destroy");
    145         }
    146         return this.maxSize;
    147     }
    148 
    149     @Override
    150     public int getCoreSize() {
    151         if(isShutdown) {
    152             throw new IllegalStateException("The thread pool is destroy");
    153         }
    154         return this.coreSize;
    155     }
    156 
    157     @Override
    158     public int getQueueSize() {
    159         if(isShutdown) {
    160             throw new IllegalStateException("The thread pool is destroy");
    161         }
    162         return runnableQueue.size();
    163     }
    164 
    165     @Override
    166     public int getActiveCount() {
    167         synchronized(this) {
    168             return this.activeCount;
    169         }
    170     }
    171 
    172     @Override
    173     public boolean isShutdown() {        
    174         return this.isShutdown;
    175     }
    176 }

    线程池的测试

    编写一个简单的测试类,同时启动20个任务,测试线程池的活动状态:

     1 public class ThreadPoolTest {
     2 
     3     public static void main(String[] args) throws InterruptedException {
     4         
     5         final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);
     6         
     7         for(int i = 0; i < 20; i++) {
     8             threadPool.execute(() -> {
     9                 try {
    10                     TimeUnit.SECONDS.sleep(10);
    11                     System.out.println(Thread.currentThread().getName() + "is Running and done");
    12                 } catch (InterruptedException e) {
    13                     e.printStackTrace();
    14                 }
    15             });
    16         }
    17         while(true) {
    18             System.out.println("getActiveCount: " + threadPool.getActiveCount());
    19             System.out.println("getQueueSize: " + threadPool.getQueueSize());
    20             System.out.println("getCoreSize: " + threadPool.getCoreSize());
    21             System.out.println("getMaxSize: " + threadPool.getMaxSize());
    22             System.out.println("================================================");
    23             TimeUnit.SECONDS.sleep(5);            
    24         }
    25     }
    26 }

    输出结果如下

    thread-pool--1is Running and done
    thread-pool-0is Running and done
    getActiveCount: 4
    getQueueSize: 14
    getCoreSize: 4
    getMaxSize: 6
    ================================================
    getActiveCount: 4
    getQueueSize: 14
    getCoreSize: 4
    getMaxSize: 6
    ================================================
    thread-pool--3is Running and done
    thread-pool--2is Running and done
    thread-pool--1is Running and done
    thread-pool-0is Running and done
    getActiveCount: 6
    getQueueSize: 8
    getCoreSize: 4
    getMaxSize: 6
  • 相关阅读:
    ABS(引数と同じ大きさの正の数を返す)
    WRITE
    LEAVE TO LIST-PROCESSING
    SHIFT(文字列の指定位置数の移動)
    【EXCEL】SUMIF(条件を指定して数値を合計する)
    【EXCEL】SUMIFS(複数の条件を指定して数値を合計する)
    【財務会計】流動資産と固定資産の違いとは?ディズニーを例にわかりやすく解説
    【財務会計】償却 とは
    【財務会計】固定資産の除却と廃棄の違い
    尽力去做你力所能及的事
  • 原文地址:https://www.cnblogs.com/leoliu168/p/9936085.html
Copyright © 2011-2022 走看看