zoukankan      html  css  js  c++  java
  • 二 Java利用等待/通知机制实现一个线程池

    接着上一篇博客的 一Java线程的等待/通知模型  ,没有看过的建议先看一下。下面我们用等待通知机制来实现一个线程池.

    本文的代码放到了github上,地址如下: git@github.com:jiulu313/ThreadPool.git

    线程的任务就以打印一行文本来模拟耗时的任务。主要代码如下:

    1  定义一个任务的接口。

    1 /*
    2  * 任务的接口
    3  */
    4 public interface Task {
    5     void doSomething();
    6 }

    2  实现一个具体的任务。

     1 /*
     2  * 具体的任务
     3  */
     4 public class PrintTask implements Task{
     5     
     6     //打印一句话,睡一秒,来模拟耗时的任务
     7     @Override
     8     public void doSomething() {
     9         System.out.println("任务:"+Thread.currentThread().getName());
    10         try {
    11             Thread.sleep(1000);
    12         } catch (InterruptedException e) {
    13             e.printStackTrace();
    14         }
    15     }
    16 }

    3  实现工作线程

     1 /*
     2  * 工作者线程
     3  */
     4 public class Worker implements Runnable {
     5     //线程是否正在运行
     6     private boolean running = true;        
     7     
     8     //保存Thread,方便start()
     9     private Thread thread;
    10     
    11     //保存线程池的任务队列,作同步用
    12     private LinkedList<Task> tasks;
    13 
    14     public void setThread(Thread thread) {
    15         this.thread = thread;
    16     }
    17 
    18     public void setTasks(LinkedList<Task> tasks) {
    19         this.tasks = tasks;
    20     }
    21 
    22     //启动此工作线程
    23     public void start() {
    24         if (thread != null) {
    25             thread.start();
    26         }
    27     }
    28 
    29     // 关闭此工作线程
    30     public void shutDown() {
    31         running = false;
    32         thread.interrupt();
    33     }
    34 
    35     @Override
    36     public void run() {
    37         while (running) {
    38             Task task = null;
    39             
    40             //对共享变量加锁,此处为任务队列,因为会有多个线程访问
    41             synchronized (tasks) {
    42                 
    43                 //当条件不满足时,线程等待,见上一篇博文
    44                 while (tasks.isEmpty()) {
    45                     try {
    46                         //线程进入等待状态,并且释放锁
    47                         tasks.wait();
    48                     } catch (InterruptedException e) {
    49                         //感知到外部对此线程的中断操作
    50                         Thread.currentThread().interrupt();
    51                         return;
    52                     }
    53                 }
    54 
    55                 //条件满足
    56                 task = tasks.removeFirst();
    57             }
    58 
    59             //执行任务
    60             if (task != null) {
    61                 task.doSomething();
    62             }
    63         }
    64     }
    65 }

    4  创建一个线程池

      1 import java.util.ArrayList;
      2 import java.util.LinkedList;
      3 import java.util.List;
      4 
      5 public class DefaultThreadPool implements ThreadPool {
      6     private int maxWorksNum = 10;
      7     private int minWorksNum = 1;
      8     private int defaultWorksNum = 5;
      9 
     10     // 任务列表
     11     private LinkedList<Task> tasks = new LinkedList<>();
     12 
     13     // 工作线程列表
     14     private LinkedList<Worker> workers = new LinkedList<>();
     15 
     16     //工作线程个数
     17     private int workerNum = defaultWorksNum;
     18     
     19 
     20     @Override
     21     public void excute(Task task) {
     22         // 添加一个工作,然后进行通知
     23         if (task != null) {
     24             synchronized (tasks) {
     25                 //添加到最后一个位置 
     26                 tasks.addLast(task);
     27                 //通知等待的线程,有新的任务了
     28                 tasks.notify();
     29             }
     30         }
     31     }
     32 
     33     // 关闭线程池
     34     @Override
     35     public void shutDown() {
     36         for (Worker worker : workers) {
     37             worker.shutDown();
     38         }
     39     }
     40 
     41     // 初始化工作者线程
     42     public void initWorkers(int num) {
     43         if (num > maxWorksNum) {
     44             num = maxWorksNum;
     45         } else if (num < minWorksNum) {
     46             num = minWorksNum;
     47         } else {
     48             num = defaultWorksNum;
     49         }
     50 
     51         for (int i = 0; i < workerNum; i++) {
     52             //创建工作线程
     53             Worker worker = new Worker();
     54             
     55             //添加到工作队列
     56             workers.add(worker);
     57             
     58             //新建一个线程对象,并将worker赋值
     59             Thread thread = new Thread(worker);
     60             
     61             //设置线程对象,作启动,中断用
     62             worker.setThread(thread);
     63             
     64             //设置任务队列,作同步用
     65             worker.setTasks(tasks);
     66         }
     67     }
     68 
     69     // 启动线程池
     70     public void start(){
     71         if(workers != null){
     72             for(Worker worker : workers){
     73                 //启动一个工作线程
     74                 worker.start();
     75             }
     76         }
     77     }
     78     
     79     // 新增加工作线程,但是不能大于线程池最大线程数
     80     @Override
     81     public void addWorkers(int num) {
     82         if (num <= 0) {
     83             return;
     84         }
     85 
     86         int remain = maxWorksNum - workerNum;
     87         if (num > remain) {
     88             num = remain;
     89         }
     90 
     91         for (int i = 0; i < num; i++) {
     92             Worker worker = new Worker();
     93             workers.add(worker);
     94             Thread thread = new Thread(worker);
     95             thread.start();
     96         }
     97         
     98         workerNum = workers.size();
     99     }
    100 
    101     // 减少工作线程,至少留1个,不能减少到0
    102     @Override
    103     public void removeWorkers(int num) {
    104         if(num >= workerNum || num <= 0){
    105             return;
    106         }
    107         
    108         for(int i =0;i<num;i++){
    109             Worker worker = workers.getLast();
    110             worker.shutDown();
    111         }
    112         
    113         workerNum = workers.size();
    114     }
    115 
    116     @Override
    117     public int getTaskSize() {
    118         return tasks.size();
    119     }
    120 
    121     
    122 }

    5  新建测试类

     1 public class ThreadPoolTest {
     2     public static void main(String[] args) throws InterruptedException {
     3         //1 新建一个线程池
     4         DefaultThreadPool pool = new DefaultThreadPool();
     5         
     6         //2 设置线程池的大小为5
     7         pool.initWorkers(5);
     8         
     9         //3 启动线程池
    10         pool.start();
    11         
    12         //4 往任务队列里面添加任务
    13         for(int i = 0;i<100;i++){
    14             pool.excute(new PrintTask());
    15         }
    16     
    17     }
    18 }

    在eclipse中运行,结果部分截图如下:

     好了,一个线程池就这样,这只是一个小例子,提示线程池的原理

    其实实现工作中,一个线程池要考虑的问题远比这个多,也更复杂。

    其中比较重要的两个方面:

    1 线程池开几个线程为最合适?

    我们知道,线程不是开的越多越好,而要根据业务的场景,硬件的指标,带宽的大小等等

    一般线程池的个数为CPU核心数的个数加1 ,google的建议。此外可能还要具体分析业务

    大,中,小的业务需求,也是不一样的。

    大任务:比如下载一部电影,可能要十几分钟甚至几十分钟的任务

    中任务:比如下载一幅图片,有1M以上了到十几M的大小的。

    小任务:比如下载的是游戏的ico,就十几K的到1M以下的。

    小任务可以多开几个线程。

    中任务的可以保守点。

    大任务的尽量不要开的线程太多

    具体值还需要看具体业务,具体场景。这些只是建议。

    2 线程用哪种队列,也是和上面有关系。

    今天就到这了,后续还会抽时间研究线程并发这块,希望对大家有帮忙。

    欢迎访问作者的helloworld的个人博客:
    https://www.helloworld.net/jiulu

    同时也可以加作者的微信:daitukeji
    也可以扫下面的二维码添加
    ![image](https://img-hello-world.oss-cn-beijing.aliyuncs.com/imgs/f0b62fd75da7cbeba3b77965f4e26845.png)

  • 相关阅读:
    Oracle函数如何把符串装换为小写的格式
    Oralce中的synonym同义词
    JS中getYear()的兼容问题
    How to do SSH Tunneling (Port Forwarding)
    所谓深度链接(Deep linking)
    upload size of asp.net
    发一个自动刷网站PV流量的小工具
    解决Visual Studio 2008 下,打开.dbml(LINQ) 文件时,提示"The operation could not be completed." 的问题。
    在资源管理器中使鼠标右键增加一个命令,运行cmd,同时使得当前路径为资源管理器当前的目录
    使用SQL语句获取Sql Server数据库的版本
  • 原文地址:https://www.cnblogs.com/start1225/p/5870072.html
Copyright © 2011-2022 走看看