zoukankan      html  css  js  c++  java
  • 线程池简单实现

    实现了一个简化版的线程池。

    实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。

    线程池代码:

      1 package learnConcurrent;
      2 
      3 import java.util.ArrayList;
      4 import java.util.Collection;
      5 import java.util.LinkedList;
      6 import java.util.List;
      7 import java.util.concurrent.ArrayBlockingQueue;
      8 import java.util.concurrent.BlockingQueue;
      9 import java.util.concurrent.Callable;
     10 import java.util.concurrent.ExecutionException;
     11 import java.util.concurrent.ExecutorService;
     12 import java.util.concurrent.Future;
     13 import java.util.concurrent.TimeUnit;
     14 import java.util.concurrent.TimeoutException;
     15 import java.util.concurrent.atomic.AtomicBoolean;
     16 import java.util.concurrent.atomic.AtomicInteger;
     17 import java.util.concurrent.locks.ReentrantLock;
     18 
     19 public class MyThreadPool implements ExecutorService{
     20     //线程队列
     21     private List<Worker> workers;
     22     //任务队列
     23     private BlockingQueue<Runnable> rQueue;
     24     //线程池核心大小
     25     private int corePoolSize;
     26     //线程池最大大小
     27     private int maxPoolSize;
     28     //空闲线程最长存活时间
     29     private int keepAliveTime = 60;
     30 
     31     private static final int ALIVE = 0;
     32     
     33     private static final int SHUTDOMN = 1;
     34     
     35     private int state = ALIVE;
     36     
     37     private ReentrantLock lock = new ReentrantLock();
     38     
     39     public MyThreadPool(int corePoolSize, int maxPoolSize){
     40         this.corePoolSize = corePoolSize;
     41         this.maxPoolSize = maxPoolSize;
     42         
     43         this.workers = new LinkedList<Worker>();
     44         //阻塞队列,最大容量为maxPoolSize
     45         this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true);
     46     }
     47     
     48     @Override
     49     public void execute(Runnable command) {
     50         if(isShutdown())
     51             return;
     52         //FIXME size在获取时和判断时 可能发生改变
     53         lock.lock();
     54         int size = workers.size();
     55         if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程
     56             addWorker();
     57         }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程
     58             addWorker();
     59         }
     60         lock.unlock();
     61         
     62         rQueue.offer(command);
     63     }
     64     
     65     @Override
     66     public void shutdown() {
     67         //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收
     68         //缺点时空闲的线程资源得不到立即释放
     69         lock.lock();
     70         state = SHUTDOMN;
     71         lock.unlock();
     72     }
     73     
     74     /**
     75      * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表
     76      */
     77     @Override
     78     public List<Runnable> shutdownNow() {
     79         if(isShutdown())
     80             return null;
     81         lock.lock();
     82         state = SHUTDOMN;
     83         List<Runnable> restRunnable = new ArrayList<Runnable>();
     84         while(!rQueue.isEmpty()){
     85             restRunnable.add(rQueue.poll());
     86         }
     87         for(Worker w : workers){
     88             w.interrupt();
     89         }
     90         lock.unlock();
     91         return restRunnable;
     92     }
     93 
     94     @Override
     95     public boolean isShutdown() {
     96         lock.lock();
     97         boolean res = state != ALIVE;
     98         lock.unlock();
     99         return res;
    100     }
    101 
    102     @Override
    103     public boolean isTerminated() {
    104         return isShutdown() && rQueue.isEmpty();
    105     }
    106 
    107     @Override
    108     public boolean awaitTermination(long timeout, TimeUnit unit)
    109             throws InterruptedException {
    110         // TODO Auto-generated method stub
    111         return false;
    112     }
    113 
    114     @Override
    115     public <T> Future<T> submit(Callable<T> task) {
    116         // TODO Auto-generated method stub
    117         return null;
    118     }
    119 
    120     @Override
    121     public <T> Future<T> submit(Runnable task, T result) {
    122         // TODO Auto-generated method stub
    123         return null;
    124     }
    125 
    126     @Override
    127     public Future<?> submit(Runnable task) {
    128         return null;
    129     }
    130 
    131     @Override
    132     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    133             throws InterruptedException {
    134         return null;
    135     }
    136 
    137     @Override
    138     public <T> List<Future<T>> invokeAll(
    139             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    140             throws InterruptedException {
    141         return null;
    142     }
    143 
    144     @Override
    145     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    146             throws InterruptedException, ExecutionException {
    147         return null;
    148     }
    149 
    150     @Override
    151     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    152             long timeout, TimeUnit unit) throws InterruptedException,
    153             ExecutionException, TimeoutException {
    154         return null;
    155     }
    156     
    157     private Runnable getTask(){
    158         Runnable r = null;
    159         try {
    160             r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS);
    161         } catch (InterruptedException e) {
    162             // TODO Auto-generated catch block
    163             e.printStackTrace();
    164         }
    165         return r;
    166     }
    167     
    168     private void addWorker(){
    169         Worker w = new Worker();
    170         w.start();
    171         lock.lock();
    172         workers.add(w);
    173         lock.unlock();
    174     }
    175     
    176     private void removeWorker(Worker w){
    177         lock.lock();
    178         workers.remove(w);
    179         lock.unlock();
    180     }
    181     
    182     class Worker extends Thread{
    183         
    184         private AtomicBoolean isAlive = new AtomicBoolean(true);
    185         
    186         private Runnable task;
    187         
    188         
    189         @Override
    190         public void run() {
    191             while(isAlive.get()){
    192                 //阻塞一定时间,超时则回收该线程
    193                 task = getTask();
    194                 if(task != null){
    195                     task.run();
    196                 }else{
    197                     isAlive.set(false);
    198                     
    199                 }
    200                 task = null;
    201             }
    202             System.out.println("remove worker");
    203             removeWorker(this);
    204         }
    205         
    206     }
    207     
    208     
    209 }

    测试代码:

     1 package learnConcurrent;
     2 
     3 
     4 public class ThreadPoolTest {
     5     static int taskNo = 0;
     6     public static void main(String[] args) throws InterruptedException {
     7         MyThreadPool pool = new MyThreadPool(2, 5);
     8         
     9         for(int i=0; i< 50; i++){
    10             Task task = new Task(taskNo++);
    11             pool.execute(task);
    12             Thread.sleep((int)(Math.random() * 1000));
    13         }
    14         
    15     }
    16     
    17 }
    18 
    19 class Task implements Runnable{
    20     String str;
    21     public Task(int taskNo){
    22         str = "TaskNo:" + taskNo;
    23     }
    24     @Override
    25     public void run() {
    26         System.out.println(str + " start work ");
    27         //DO SOMETHING
    28         try {
    29             Thread.sleep((int)(Math.random() * 1000));
    30         } catch (InterruptedException e) {
    31             // TODO Auto-generated catch block
    32             e.printStackTrace();
    33         }
    34         
    35         System.out.println(str + " done ");
    36     }
    37     
    38 }

    虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。

    测试代码也很简陋,仅供参考。

  • 相关阅读:
    c语言博客作业04--数组
    C博客作业03--函数
    c博客作业02--循环结构
    C博客作业01--顺序分支结构
    我的第一篇博客
    java--购物车程序的面向对象设计
    c博客作业05--指针
    C博客作业04--数组
    C博客作业03--函数
    C博客作业02--循环结构
  • 原文地址:https://www.cnblogs.com/insaneXs/p/7508328.html
Copyright © 2011-2022 走看看