zoukankan      html  css  js  c++  java
  • java手写线程池,完善中

    package com.test001.threadpool;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Random;
    import java.util.Vector;
    import java.util.concurrent.Callable;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class ThreadPool <T>{
        private int maxRunThreadNum;
        private int maxWaitTaskNum;
        private int minActiveThreadNum;
        private int addTaskTimeout;
        private Vector<Worker> workerList;
        private static ThreadPool threadPool;
        private LinkedBlockingQueue<Task<T>> taskBlockingDeque;
        private AtomicLong workingNum = new AtomicLong();
        //初始化参数
        private ThreadPool(){
            maxRunThreadNum = 10;
            maxWaitTaskNum = 50;
            minActiveThreadNum = 5;
            addTaskTimeout = 5;
            workerList = new Vector<>(maxRunThreadNum);
            taskBlockingDeque = new LinkedBlockingQueue<>(maxWaitTaskNum);
            createWorker(minActiveThreadNum);
    
        }
    
        public synchronized static <T> ThreadPool<T> getThreadPool(){
            if(threadPool==null){
                threadPool = new ThreadPool<T>();
            }
            return threadPool;
        }
    
        public synchronized void close(){
            for (Worker worker:workerList){
                worker.close();
                worker.interrupt();
            }
        }
        //    添加线程
        private synchronized boolean createWorker(int num){
            if(workerList.size()+num<maxRunThreadNum){
                for (int i=0;i<num;i++) {
                    Worker worker = new Worker();
                    workerList.add(worker);
                    worker.start();
                }
                return true;
            }
            return false;
        }
    
        public Future<T> addTask(Callable<T> callable) throws Exception {
    //        TODO::考虑 计算当前启动的worker和最大worker之间的数量 以及当前正在运行的和启动的数量,启动更多worker
            System.out.println("add");
            Task<T> task = new Task<>(callable);
            boolean flag = taskBlockingDeque.offer(task,addTaskTimeout, TimeUnit.SECONDS);
            if(!flag) return null;
            return task.getFuture();
        }
    
        class Task<T>{
            public Callable<T> getCallable() {
                return callable;
            }
    
            public void setCallable(Callable<T> callable) {
                this.callable = callable;
            }
    
            public Future<T> getFuture() {
                return future;
            }
    
            public void setFuture(Future<T> future) {
                this.future = future;
            }
    
            private Callable<T> callable;
            private Future<T> future;
    
            public Task(Callable<T> callable){
                this.callable = callable;
                this.future = new Future<>();
            }
    
    
    
        }
    
    
    
        class Worker extends Thread{
            private boolean closeFlag = false;
            @Override
            public void run() {
                while(!closeFlag){
                    if(taskBlockingDeque.isEmpty()){
    //                    TODO::新增信号量,等待唤醒
                    }else{
                        Task<T> task = taskBlockingDeque.poll();
                        if(task!=null){
                            Callable<T> callable = task.getCallable();
                            Future<T> future = task.getFuture();
                            try {
                                workingNum.getAndDecrement();
                                T t = callable.call();
    //                            System.out.println("get t="+t);
                                future.set(t);
                                workingNum.decrementAndGet();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
    
    
                    }
                }
            }
    
            public void close(){
                closeFlag = true;
            }
        }
    
        public static void main(String[] args) throws Exception {
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            ThreadPool<Integer> t = ThreadPool.getThreadPool();
            List<Future<Integer>> futures = new LinkedList<>();
            for (int i=1;i<=50;i++){
                Integer data = i;
                Future<Integer> f = t.addTask(()->{
                    Random rd = new Random();
    //                int num = rd.nextInt(5)+1;
                    int num = data;
    //                System.out.println("start	data="+data+"	 thread="+Thread.currentThread().getName()+"	sleep:"+num);
                    Thread.sleep(num*1000);
                    System.out.println("end	data="+data+"	thread="+Thread.currentThread().getName());
                    return num;
                });
                futures.add(f);
            }
    //        Thread.sleep(25*1000);
            int i =0;
            for(Future<Integer> f :futures){
                i++;
                System.out.println("get data	"+i+"="+f.get());
            }
            t.close();
            long endTime = System.currentTimeMillis(); //获取结束时间
            System.out.println("程序运行时间:" + (endTime - startTime) + "ms"); //输出程序运行时间
        }
    
    }
    
    class Future<T>{
        private volatile boolean hasResp = false;
        private T t;
        private Semaphore semaphore = new Semaphore(1);
        public Future(){
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public T get() throws InterruptedException {
    //        等待对应的信号量,然后到对应任务编号,取出对应的值
            semaphore.acquire();
            return t;
        }
    
        protected void set(T t){
            this.t = t;
            semaphore.release();
        }
    
    }
  • 相关阅读:
    JavaScript:事件
    JavaScript系统对象
    DOM基础:table(表格)
    DOM基础
    Cookie的简单实用
    javascript:变量的作用域
    javascript:没有定义的变量和没有定义的属性
    数组的基本使用
    静态代码块、代码块、构造函数、匿名内部类、匿名内部类中的代码块
    java使用指定的国际化文件
  • 原文地址:https://www.cnblogs.com/mengxingxinqing/p/10697241.html
Copyright © 2011-2022 走看看