zoukankan      html  css  js  c++  java
  • Java 线程池的原理与实现(转)

    这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧。
    线程池就是其中之一,一提到线程,我们会想到以前《操作系统》的生产者与消费者,信号量,同步控制等等。
    一提到池,我们会想到数据库连接池,但是线程池又如何呢?

    建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用法。
    关于我对同步的认识,要缘于大三年的一本书,书名好像是 Java 实战,这本书写得实在太妙了,真正的从理论到实践,从截图分析到.class字节码分析。哇,我想市场上很难买到这么精致的书了。作为一个Java爱好者,我觉得绝对值得一读。
    我对此书印象最深之一的就是:equal()方法,由浅入深,经典!
    还有就是同步了,其中提到了我的几个编程误区,以前如何使用同步提高性能等等,通过学习,使我对同步的认识进一步加深。
    --------------------------------------------------------------------------------------------------
    简单介绍

        创建线程有两种方式:继承Thread或实现Runnable。Thread实现了Runnable接口,提供了一个空的run()方法,所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。
        一个线程创建后就存在,调用start()方法就开始运行(执行run()方法),调用wait进入等待或调用sleep进入休眠期,顺利运行完毕或休眠被中断或运行过程中出现异常而退出。

    wait和sleep比较:
        sleep方法有:sleep(long millis),sleep(long millis, long nanos),调用sleep方法后,当前线程进入休眠期,暂停执行,但该线程继续拥有监视资源的所有权。到达休眠时间后线程将继续执行,直到完成。若在 休眠期另一线程中断该线程,则该线程退出。
        wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),调用wait方法后,该线程放弃监视资源的所有权进入等待状态;
          wait():等待有其它的线程调用notify()或notifyAll()进入调度状态,与其它线程共同争夺监视。wait()相当于wait(0),wait(0, 0)。
          wait(long timeout):当其它线程调用notify()或notifyAll(),或时间到达timeout亳秒,或有其它某线程中断该线程,则该线程进入调度状态。
          wait(long timeout, long nanos):相当于wait(1000000*timeout + nanos),只不过时间单位为纳秒。
    ===================================================================================================

    线程池:
        多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
       
        假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
       
        如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                    一个线程池包括以下四个基本组成部分:
                    1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                    2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                    3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                    4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                   
        线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

        线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

        假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线 程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建 50000而在处理请求时浪费时间,从而提高效率。
    ------------------------------------------------------------------------------
    好了,废话就到这里了,下面就是程序了,我也不讲解了,注释已经很清晰了:

    /** 线程池类,工作线程作为其内部类 **/

      1 package org.ymcn.util;
      2 
      3 import java.util.Collections;
      4 import java.util.Date;
      5 import java.util.LinkedList;
      6 import java.util.List;
      7 
      8 import org.apache.log4j.Logger;
      9 
     10 /**
     11 * 线程池
     12 * 创建线程池,销毁线程池,添加新任务
     13 *
     14 * @author obullxl
     15 */
     16 public final class ThreadPool {
     17     private static Logger logger = Logger.getLogger(ThreadPool.class);
     18     private static Logger taskLogger = Logger.getLogger("TaskLogger");
     19 
     20     private static boolean debug = taskLogger.isDebugEnabled();
     21     // private static boolean debug = taskLogger.isInfoEnabled();
     22     /* 单例 */
     23     private static ThreadPool instance = ThreadPool.getInstance();
     24 
     25     public static final int SYSTEM_BUSY_TASK_COUNT = 150;
     26     /* 默认池中线程数 */
     27     public static int worker_num = 5;
     28     /* 已经处理的任务数 */
     29     private static int taskCounter = 0;
     30 
     31     public static boolean systemIsBusy = false;
     32 
     33     private static List<Task> taskQueue = Collections
     34             .synchronizedList(new LinkedList<Task>());
     35     /* 池中的所有线程 */
     36     public PoolWorker[] workers;
     37 
     38     private ThreadPool() {
     39         workers = new PoolWorker[5];
     40         for (int i = 0; i < workers.length; i++) {
     41             workers[i] = new PoolWorker(i);
     42         }
     43     }
     44 
     45     private ThreadPool(int pool_worker_num) {
     46         worker_num = pool_worker_num;
     47         workers = new PoolWorker[worker_num];
     48         for (int i = 0; i < workers.length; i++) {
     49             workers[i] = new PoolWorker(i);
     50         }
     51     }
     52 
     53     public static synchronized ThreadPool getInstance() {
     54         if (instance == null)
     55             return new ThreadPool();
     56         return instance;
     57     }
     58     /**
     59     * 增加新的任务
     60     * 每增加一个新任务,都要唤醒任务队列
     61     * @param newTask
     62     */
     63     public void addTask(Task newTask) {
     64         synchronized (taskQueue) {
     65             newTask.setTaskId(++taskCounter);
     66             newTask.setSubmitTime(new Date());
     67             taskQueue.add(newTask);
     68             /* 唤醒队列, 开始执行 */
     69             taskQueue.notifyAll();
     70         }
     71         logger.info("Submit Task<" + newTask.getTaskId() + ">: "
     72                 + newTask.info());
     73     }
     74     /**
     75     * 批量增加新任务
     76     * @param taskes
     77     */
     78     public void batchAddTask(Task[] taskes) {
     79         if (taskes == null || taskes.length == 0) {
     80             return;
     81         }
     82         synchronized (taskQueue) {
     83             for (int i = 0; i < taskes.length; i++) {
     84                 if (taskes[i] == null) {
     85                     continue;
     86                 }
     87                 taskes[i].setTaskId(++taskCounter);
     88                 taskes[i].setSubmitTime(new Date());
     89                 taskQueue.add(taskes[i]);
     90             }
     91             /* 唤醒队列, 开始执行 */
     92             taskQueue.notifyAll();
     93         }
     94         for (int i = 0; i < taskes.length; i++) {
     95             if (taskes[i] == null) {
     96                 continue;
     97             }
     98             logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
     99                     + taskes[i].info());
    100         }
    101     }
    102     /**
    103     * 线程池信息
    104     * @return
    105     */
    106     public String getInfo() {
    107         StringBuffer sb = new StringBuffer();
    108         sb.append("
    Task Queue Size:" + taskQueue.size());
    109         for (int i = 0; i < workers.length; i++) {
    110             sb.append("
    Worker " + i + " is "
    111                     + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
    112         }
    113         return sb.toString();
    114     }
    115     /**
    116     * 销毁线程池
    117     */
    118     public synchronized void destroy() {
    119         for (int i = 0; i < worker_num; i++) {
    120             workers[i].stopWorker();
    121             workers[i] = null;
    122         }
    123         taskQueue.clear();
    124     }
    125 
    126     /**
    127     * 池中工作线程
    128     *
    129     * @author obullxl
    130     */
    131     private class PoolWorker extends Thread {
    132         private int index = -1;
    133         /* 该工作线程是否有效 */
    134         private boolean isRunning = true;
    135         /* 该工作线程是否可以执行新任务 */
    136         private boolean isWaiting = true;
    137 
    138         public PoolWorker(int index) {
    139             this.index = index;
    140             start();
    141         }
    142 
    143         public void stopWorker() {
    144             this.isRunning = false;
    145         }
    146 
    147         public boolean isWaiting() {
    148             return this.isWaiting;
    149         }
    150         /**
    151         * 循环执行任务
    152         * 这也许是线程池的关键所在
    153         */
    154         public void run() {
    155             while (isRunning) {
    156                 Task r = null;
    157                 synchronized (taskQueue) {
    158                     while (taskQueue.isEmpty()) {
    159                         try {
    160                             /* 任务队列为空,则等待有新任务加入从而被唤醒 */
    161                             taskQueue.wait(20);
    162                         } catch (InterruptedException ie) {
    163                             logger.error(ie);
    164                         }
    165                     }
    166                     /* 取出任务执行 */
    167                     r = (Task) taskQueue.remove(0);
    168                 }
    169                 if (r != null) {
    170                     isWaiting = false;
    171                     try {
    172                         if (debug) {
    173                             r.setBeginExceuteTime(new Date());
    174                             taskLogger.debug("Worker<" + index
    175                                     + "> start execute Task<" + r.getTaskId() + ">");
    176                             if (r.getBeginExceuteTime().getTime()
    177                                     - r.getSubmitTime().getTime() > 1000)
    178                                 taskLogger.debug("longer waiting time. "
    179                                         + r.info() + ",<" + index + ">,time:"
    180                                         + (r.getFinishTime().getTime() - r
    181                                                 .getBeginExceuteTime().getTime()));
    182                         }
    183                         /* 该任务是否需要立即执行 */
    184                         if (r.needExecuteImmediate()) {
    185                             new Thread(r).start();
    186                         } else {
    187                             r.run();
    188                         }
    189                         if (debug) {
    190                             r.setFinishTime(new Date());
    191                             taskLogger.debug("Worker<" + index
    192                                     + "> finish task<" + r.getTaskId() + ">");
    193                             if (r.getFinishTime().getTime()
    194                                     - r.getBeginExceuteTime().getTime() > 1000)
    195                                 taskLogger.debug("longer execution time. "
    196                                         + r.info() + ",<" + index + ">,time:"
    197                                         + (r.getFinishTime().getTime() - r
    198                                                 .getBeginExceuteTime().getTime()));
    199                         }
    200                     } catch (Exception e) {
    201                         e.printStackTrace();
    202                         logger.error(e);
    203                     }
    204                     isWaiting = true;
    205                     r = null;
    206                 }
    207             }
    208         }
    209     }
    210 }

    /** 任务接口类 **/

      1 package org.ymcn.util;
      2 
      3 import java.util.Date;
      4 
      5 /**
      6 * 所有任务接口
      7 * 其他任务必须继承访类
      8 *
      9 * @author obullxl
     10 */
     11 public abstract class Task implements Runnable {
     12     // private static Logger logger = Logger.getLogger(Task.class);
     13     /* 产生时间 */
     14     private Date generateTime = null;
     15     /* 提交执行时间 */
     16     private Date submitTime = null;
     17     /* 开始执行时间 */
     18     private Date beginExceuteTime = null;
     19     /* 执行完成时间 */
     20     private Date finishTime = null;
     21 
     22     private long taskId;
     23 
     24     public Task() {
     25         this.generateTime = new Date();
     26     }
     27 
     28     /**
     29     * 任务执行入口
     30     */
     31     public void run() {
     32         /**
     33         * 相关执行代码
     34         *
     35         * beginTransaction();
     36         *
     37         * 执行过程中可能产生新的任务 subtask = taskCore();
     38         *
     39         * commitTransaction();
     40         *
     41         * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
     42         */
     43     }
     44 
     45     /**
     46     * 所有任务的核心 所以特别的业务逻辑执行之处
     47     *
     48     * @throws Exception
     49     */
     50     public abstract Task[] taskCore() throws Exception;
     51 
     52     /**
     53     * 是否用到数据库
     54     *
     55     * @return
     56     */
     57     protected abstract boolean useDb();
     58 
     59     /**
     60     * 是否需要立即执行
     61     *
     62     * @return
     63     */
     64     protected abstract boolean needExecuteImmediate();
     65 
     66     /**
     67     * 任务信息
     68     *
     69     * @return String
     70     */
     71     public abstract String info();
     72 
     73     public Date getGenerateTime() {
     74         return generateTime;
     75     }
     76 
     77     public Date getBeginExceuteTime() {
     78         return beginExceuteTime;
     79     }
     80 
     81     public void setBeginExceuteTime(Date beginExceuteTime) {
     82         this.beginExceuteTime = beginExceuteTime;
     83     }
     84 
     85     public Date getFinishTime() {
     86         return finishTime;
     87     }
     88 
     89     public void setFinishTime(Date finishTime) {
     90         this.finishTime = finishTime;
     91     }
     92 
     93     public Date getSubmitTime() {
     94         return submitTime;
     95     }
     96 
     97     public void setSubmitTime(Date submitTime) {
     98         this.submitTime = submitTime;
     99     }
    100 
    101     public long getTaskId() {
    102         return taskId;
    103     }
    104 
    105     public void setTaskId(long taskId) {
    106         this.taskId = taskId;
    107     }
    108 
    109 }
  • 相关阅读:
    Springboot Endpoint之二:Endpoint源码剖析
    Linux进程被杀掉(OOM killer),查看系统日志
    docker常用命令详解
    micrometer自定义metrics
    使有prometheus监控redis,mongodb,nginx,mysql,jmx
    Grafana+Prometheus打造springboot监控平台
    Grafana介绍
    Prometheus介绍
    Groovy与Java集成常见的坑
    ES之1:基本概念及原理
  • 原文地址:https://www.cnblogs.com/xingmeng/p/3267933.html
Copyright © 2011-2022 走看看