zoukankan      html  css  js  c++  java
  • Java 线程池的原理及实现

    1、线程池简介:

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

    一个线程池包括以下四个基本组成部分:

      1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;

      2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;

      3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

      4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                    
    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

    假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

    线程池实现代码:

      1 package com.wb.thread;
      2 
      3 import java.util.LinkedList;
      4 import java.util.List;
      5 
      6 /**
      7  * 线程池类
      8  * @author wangbo
      9  *
     10  */
     11 public class ThreadPool {
     12     
     13     private static int worker_num = 5;//线程池中线程的个数,默认为5
     14     
     15     private WorkThread[] workthreads;//工作线程
     16     
     17     private static volatile int finished_task = 0;//未处理的任务
     18     
     19     private List<Runnable> taskQueue = new LinkedList<Runnable>();//任务队列
     20     
     21     private static ThreadPool threadPool;
     22     
     23     /**
     24      * 无参构造器,创建线程池
     25      */
     26     private ThreadPool(){
     27         this(5);
     28     }
     29     
     30     /**
     31      * 含参构造器,创建线程池
     32      * @param num
     33      */
     34     private ThreadPool(int num){
     35         worker_num = num;
     36         workthreads = new WorkThread[num];
     37         for (int i = 0; i < workthreads.length; i++) {
     38             workthreads[i] = new WorkThread();
     39             workthreads[i].start();//开启线程
     40         }
     41     }
     42     
     43     /**
     44      * 获得一个默认线程个数的线程池
     45      * @return
     46      */
     47     public static ThreadPool getThreadPool(){
     48         return getThreadPool(ThreadPool.worker_num);
     49     }
     50     
     51     /**
     52      * 获得一个指定线程个数的线程池
     53      * @param num
     54      * @return
     55      */
     56     public static ThreadPool getThreadPool(int num) {
     57         if (num <= 0) {
     58             num = ThreadPool.worker_num;
     59         }
     60         if (threadPool == null) {
     61             threadPool = new ThreadPool(num);
     62         }
     63         return threadPool;
     64     }
     65 
     66     /**
     67      * 将任务单个添加到队列
     68      * @param task
     69      */
     70     public void execute(Runnable task){
     71         synchronized (taskQueue) {
     72             taskQueue.add(task);
     73             taskQueue.notify();
     74         }
     75     }
     76     
     77     /**
     78      * 将任务批量添加到队列
     79      * @param tasks
     80      */
     81     public void execute(Runnable[] tasks){
     82         synchronized (taskQueue) {
     83             for (Runnable runnable : tasks) {
     84                 taskQueue.add(runnable);
     85             }
     86             taskQueue.notify();
     87         }
     88     }
     89     
     90     /**
     91      * 将任务批量添加到队列
     92      * @param tasks
     93      */
     94     public void execute(List<Runnable> tasks){
     95         synchronized (taskQueue) {
     96             for (Runnable runnable : tasks) {
     97                 taskQueue.add(runnable);
     98             }
     99             taskQueue.notify();
    100         }
    101     }
    102     
    103     /**
    104      * 销毁线程池
    105      */
    106     public void destroy(){
    107         //还有任务没有执行完
    108         while(!taskQueue.isEmpty()){
    109             try {
    110                 Thread.sleep(10);
    111             } catch (InterruptedException e) {
    112                 e.printStackTrace();
    113             }
    114         }
    115         //停止工作线程,且置为null
    116         for (int i = 0; i < workthreads.length; i++) {
    117             workthreads[i].stopWorker();
    118             workthreads[i] = null;
    119         }
    120         threadPool = null;
    121         taskQueue.clear();//清空队列
    122     }
    123     
    124     /**
    125      * 获取工作线程的个数
    126      * @return
    127      */
    128     public int getWorkThreadNumber(){
    129         return worker_num;
    130     }
    131     
    132     /**
    133      * 获取已完成任务数量
    134      * @return
    135      */
    136     public int getFinishedTaskNumber(){
    137         return finished_task;
    138     }
    139     
    140     /**
    141      * 获取未完成任务数量
    142      * @return
    143      */
    144     public int getWaitTaskNumber(){
    145         return taskQueue.size();
    146     }
    147     
    148     /**
    149      * 获取线程池信息
    150      */
    151     @Override
    152     public String toString() {
    153         return "工作线程数量:" + getWorkThreadNumber() 
    154                 + ",已完成任务数量" + getFinishedTaskNumber()
    155                 + ",未完成任务数量" + getWaitTaskNumber();
    156                 
    157     }
    158 
    159     /**
    160      * 内部类,工作线程
    161      * @author wangbo
    162      *
    163      */
    164     private class WorkThread extends Thread{
    165         
    166         private boolean isRunning = true;//线程有效标志
    167 
    168         @Override
    169         public void run() {
    170             Runnable runnable = null;
    171             while (isRunning) {
    172                 synchronized (taskQueue) {
    173                     //队列为空
    174                     while (isRunning && taskQueue.isEmpty()) {
    175                         try {
    176                             taskQueue.wait(20);
    177                         } catch (InterruptedException e) {
    178                             e.printStackTrace();
    179                         }
    180                     }
    181                     //队列不为空
    182                     if (!taskQueue.isEmpty()) {
    183                         runnable = taskQueue.remove(0);//去除任务
    184                     }
    185                 }
    186                 if (runnable != null) {
    187                     runnable.run();//执行任务
    188                 }
    189                 finished_task++;
    190                 runnable = null;
    191             }
    192             
    193         }
    194         
    195         /**
    196          * 停止线程
    197          */
    198         public void stopWorker() {
    199             isRunning = false;
    200         }
    201         
    202     }
    203 
    204 }

    测试代码:

     1 package com.wb.thread;
     2 
     3 public class ThreadPoolTest {
     4 
     5      public static void main(String[] args) {  
     6         // 创建3个线程的线程池  
     7         ThreadPool t = ThreadPool.getThreadPool(3);  
     8         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
     9         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
    10         System.out.println(t);  
    11         t.destroy();//所有线程都执行完成才destory  
    12         System.out.println(t);  
    13     }
    14       
    15     // 任务类  
    16     static class Task implements Runnable {
    17         
    18         private static volatile int i = 1;
    19         
    20         @Override
    21         public void run() {// 执行任务
    22             System.out.println("任务 " + (i++) + " 完成");
    23         }  
    24     }  
    25 
    26 }

    2、java类库中提供的线程池简介:

    java.util.concurrent包提供了现成的线程池的实现。

    示例代码:

     1 package com.wb.thread;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 /**
     6  * newCachedThreadPool()
     7  * 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
     8  * 有任务才会创建线程,空闲线程会被保留60s
     9  * @author wangbo
    10  *
    11  */
    12 public class ThreadPoolExecutorTest1 {
    13     
    14     public static void main(String[] args) {
    15         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    16         for (int i = 0; i < 10; i++) {
    17             final int index = i;
    18             try {
    19                 Thread.sleep(1000);
    20             } catch (InterruptedException e) {
    21                 e.printStackTrace();
    22             }
    23             cachedThreadPool.execute(new Runnable() {
    24                 @Override
    25                 public void run() {
    26                     System.out.println(index);
    27                     System.out.println(Thread.currentThread().getName());
    28                 }
    29             });
    30         }
    31     }
    32 
    33 }
     1 package com.wb.thread;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 /**
     6  * newFixedThreadPool(int nThreads)
     7  * 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
     8  * 线程池中包含固定数目的线程,空闲线程会一直保留,参数nThreads表示设定线程池中线程的数目
     9  * @author wangbo
    10  *
    11  */
    12 public class ThreadPoolExecutorTest2 {
    13     
    14     public static void main(String[] args) {
    15         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
    16         for (int i = 0; i < 10; i++) {
    17             final int index = i;
    18             fixedThreadPool.execute(new Runnable() {
    19                 @Override
    20                 public void run() {
    21                     try {
    22                         System.out.println(index);
    23                         System.out.println(Thread.currentThread().getName());
    24                         Thread.sleep(2000);
    25                     } catch (InterruptedException e) {
    26                         e.printStackTrace();
    27                     }
    28                 }
    29             });
    30         }
    31     }
    32 
    33 }
     1 package com.wb.thread;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.ScheduledExecutorService;
     7 import java.util.concurrent.TimeUnit;
     8 /**
     9  * newScheduledThreadPool(int corePoolSize)
    10  * 线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间。
    11  * 参数corePoolSize设定线程池中线程最小数目,当任务较多时,线程池可能会创建更多的工作线程来执行任务。
    12  * @author wangbo
    13  *
    14  */
    15 public class ThreadPoolExecutorTest3 {
    16     
    17     public static void main(String[] args) {
    18         
    19         method1();
    20         method2();
    21         
    22     }
    23     
    24     /**
    25      * 延迟3s执行
    26      */
    27     private static void method1(){
    28         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    29         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
    30         scheduledThreadPool.schedule(new Runnable() {
    31             public void run() {
    32                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    33                 System.out.println("延迟2s执行");
    34             }
    35         }, 2, TimeUnit.SECONDS);
    36     }
    37     
    38     /**
    39      * 延迟2s执行后每3s执行一次
    40      */
    41     private static void method2() {
    42         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    43         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
    44         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
    45             public void run() {
    46                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    47                 System.out.println("延迟2s执行后每3s执行一次");
    48                 System.out.println(Thread.currentThread().getName());
    49             }
    50         }, 2, 3, TimeUnit.SECONDS);
    51     }
    52 
    53 }
     1 package com.wb.thread;
     2 
     3 import java.util.concurrent.ExecutorService;
     4 import java.util.concurrent.Executors;
     5 /**
     6  * newSingleThreadExecutor(int nThreads)
     7  * 线程池中只有一个线程,它依次执行每个任务。
     8  * 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
     9  * @author wangbo
    10  *
    11  */
    12 public class ThreadPoolExecutorTest4 {
    13     
    14     public static void main(String[] args) {
    15         ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
    16         for (int i = 0; i < 10; i++) {
    17             final int index = i;
    18             singleThreadPool.execute(new Runnable() {
    19                 @Override
    20                 public void run() {
    21                     try {
    22                         System.out.println(index);
    23                         System.out.println(Thread.currentThread().getName());
    24                         Thread.sleep(2000);
    25                     } catch (InterruptedException e) {
    26                         e.printStackTrace();
    27                     }
    28                 }
    29             });
    30         }
    31     }
    32 
    33 }
     1 package com.wb.thread;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 import java.util.concurrent.Executors;
     6 import java.util.concurrent.ScheduledExecutorService;
     7 import java.util.concurrent.TimeUnit;
     8 /**
     9  * newSingleThreadScheduledExecutor()
    10  * 线程池中只有一个线程,它能按照时间计划执行每个任务。
    11  * @author wangbo
    12  *
    13  */
    14 public class ThreadPoolExecutorTest5 {
    15     
    16     public static void main(String[] args) {
    17         
    18         method1();
    19         method2();
    20         
    21     }
    22     
    23     /**
    24      * 延迟3s执行
    25      */
    26     private static void method1(){
    27         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    28         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
    29         scheduledThreadPool.schedule(new Runnable() {
    30             public void run() {
    31                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    32                 System.out.println("延迟2s执行");
    33             }
    34         }, 2, TimeUnit.SECONDS);
    35     }
    36     
    37     /**
    38      * 延迟2s执行后每3s执行一次
    39      */
    40     private static void method2() {
    41         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    42         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
    43         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
    44             public void run() {
    45                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    46                 System.out.println("延迟2s执行后每3s执行一次");
    47                 System.out.println(Thread.currentThread().getName());
    48             }
    49         }, 2, 3, TimeUnit.SECONDS);
    50     }
    51 
    52 }
  • 相关阅读:
    OC中nil、Nil、NULL、NSNull的区别
    SOJ 1050. Numbers & Letters
    SOJ 1009. Mersenne Composite N
    SOJ 1006. Team Rankings
    SOJ 1036. Crypto Columns
    SOJ 1151. 魔板
    SOJ 1007. To and Fro
    SOJ 1150.简单魔板
    SOJ 1051 Biker's Trip Odometer
    SOJ 1176 Two Ends
  • 原文地址:https://www.cnblogs.com/wbxk/p/7682680.html
Copyright © 2011-2022 走看看