zoukankan      html  css  js  c++  java
  • 自实现简单线程池

      线程池在现在的系统和框架中十分常见。明白线程池的思想原理,不仅对学习线程只是有很大的帮助。对理解一些系统的线程池实现也有很大的帮助。下面是我自己简单实现的一个线程池。用以对线程的简单理解。

      线程的实现原理很简单:

        线程池对象包含以下组件:工作者队列,Job队列;

        用户通过线程池对象添加删除工作者,线程池对象维持工作者对象这个池和工作者的实际工作;

        工作者池中的线程在用户没用明确关闭前不断的从Job队列拿取job执行job。

      好了,一切看代码:

      1.以接口编程,首先创建ThreadPool接口:

        

    /**
     * 线程池接口
     * @author yum
     *
     */
    public interface ThreadPool<Job extends Runnable> {
        //执行一个Job,Job必须实现Runnable接口
        void execute(Job job);
        //关闭线程池
        void shutdown();
        //添加工作者线程
        void addWorkers(int num);
        //减少工作者线程
        void removeWorkers(int num);
        //得到真在等待执行的任务的数量
        int getJobSize();
    }

      2.实现ThreadPool接口:

    /**
     * 自定义默认线程池
     * @author yum
     *
     * @param <Job>
     */
    public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{
        
        //线程最大限制数
        private static final int MAX_WORKER_NUMBER = 10;
        //线程池默认数量
        private static final int DEFAULT_WORKER_NUMBER = 5;
        //线程池最小的数量
        private static final int MIN_WORKER_NUMBER = 1;
        //工作列表,可以向其中添加工作数
        private final LinkedList<Job> jobs = new LinkedList<>();
        //工作者列表
        private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
        //工作者线程的数量
        private int workerNum = 5;
        //线程编号
        private AtomicLong threadNum = new AtomicLong();
    
        public DefaultThreadPool() {
            initWorkers(DEFAULT_WORKER_NUMBER);
        }
    
        public DefaultThreadPool(int num){
            workerNum = num > MAX_WORKER_NUMBER?MAX_WORKER_NUMBER:num < MIN_WORKER_NUMBER?MIN_WORKER_NUMBER:num;
            initWorkers(workerNum);
        }
        
        @Override
        public void execute(Job job) {
            if(job==null)
                throw new NullPointerException();
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    
        @Override
        public void shutdown() {
            for (int i = 0; i < workers.size(); i++) {
                Worker worker = workers.get(i);
                worker.shutdown();
            }
        }
    
        @Override
        public void addWorkers(int num) {
            synchronized (jobs) {
                if(num+this.workerNum>MAX_WORKER_NUMBER)
                    num = MAX_WORKER_NUMBER - workerNum;
                initWorkers(num);
                workerNum+=num;
            }
        }
    
        @Override
        public void removeWorkers(int num) {
            synchronized (jobs) {
                if(num>=workerNum)
                    throw new IllegalArgumentException("beyond worknum");
                //按照给定的数量关闭worker
                int count = 0;
                while(count<num){
                    Worker worker = workers.get(count);
                    if(workers.remove(worker)){
                        worker.shutdown();
                        count++;
                    }
                }
                this.workerNum-=num;
            }
        }
    
        @Override
        public int getJobSize() {
            return this.workerNum;
        }
        
        //初始化线程工作者
        private void initWorkers(int num){
            for (int i = 0; i < num; i++) {
                Worker worker = new Worker();
                workers.add(worker);
                Thread thread = new Thread(worker, "ThreadPool-Worker-"+threadNum.getAndIncrement());
                thread.start();
            }
        }
        
        //工作者线程
        class Worker implements Runnable{
            //是否工作
            private volatile boolean running = true;
            @Override
            public void run() {
                while(running){
                    Job job = null;
                    synchronized (jobs) {
                        while(jobs.isEmpty()){
                            try {
                                jobs.wait();
                            } catch (InterruptedException e) {
                                //感知到外部WorkerThread的中断,返回
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                        //取出一个Job
                        job = jobs.removeFirst();
                    }
                    if(job!=null){
                        try {
                            job.run();
                        } catch (Exception e) {
                            //忽略Job中的异常
                        }
                    }
                }
            }
            
            public void shutdown(){
                running = false;
            }
            
        }
    }

      3.测试代码如下:

    public class ThisTest {
        public static void main(String[] args) {
            
            ThreadPool<Runnable> pool = new DefaultThreadPool<>(2);
            for (int i = 0; i < 100; i++) {
                pool.execute(new countThread());
            }
        }
        
        static class countThread implements Runnable{
            private static volatile Integer count  = 0;
            private static Object object = new Object();
            
            @Override
            public void run() {
                synchronized(object){
                    count++;
                    System.err.println(count);
                }
            }
            
        }
    }

      

        

  • 相关阅读:
    webbench之使用(二)
    webbench之编译安装(一)
    Linux下四款Web服务器压力测试工具(http_load、webbench、ab、siege)介绍
    OneThink开发框架
    性能瓶颈调优
    Jmeter之Web端HTTP性能测试(九)
    RobotFramework自动化测试之脚本编写(一)
    LoadRunner之安装、破解、汉化教程(一)
    Java学习之Thread方法
    Java学习之线程通信(多线程(Lock))--生产者消费者
  • 原文地址:https://www.cnblogs.com/WeaRang/p/5433245.html
Copyright © 2011-2022 走看看