zoukankan      html  css  js  c++  java
  • 手写简易线程池

    /**手写简易的线程池
     * @author 胡庆安
     * @version V1.0
     * @Package Thread.pool
     * @date 2020/10/23 10:57
     * @Copyright © 2010-2020 爬山虎科技(扬州)股份有限公司
     */
    public class HqaPool {
    
        /**
         * 核心线程数
         */
        int coreSize;
        /**
         * 队列存放最大任务数
         */
        int capcity;
        /**
         * 闲置多久线程会被会被回收
         */
        long times;
    
        HashSet<Task> tasks = new HashSet<>();
    
        BlockingQueue queue;
    
        public HqaPool(int coreSize, int capcity, long times) {
            this.coreSize = coreSize;
            this.capcity = capcity;
            this.times = times;
            queue = new BlockingQueue(capcity);
        }
    
        //执行任务
        public void execute(Runnable task){
            synchronized (tasks){
                Task taskRe = new Task(task);
                //如果任务没有超过coreSize,直接交给Task对象执行
                if(tasks.size()<coreSize){
                    tasks.add(taskRe);
                    taskRe.start();
                }else {//如果任务超过了coreSize,直接添加到队列等待
                    queue.put(taskRe);
                    System.out.println("任务"+taskRe+"被添加到了队列里");
                }
            }
        }
    
        /**
         * 阻塞任务队列(存放任务)
         */
        class BlockingQueue{
    
            private Deque<Task> deque = new ArrayDeque<Task>();
            public ReentrantLock lock = new ReentrantLock();
            private int capcity;
            //生产者条件变量
            private Condition fullWaitSet = lock.newCondition();
            //消费者条件变量
            private Condition emptyWaitSet = lock.newCondition();
    
            public BlockingQueue(int capcity) {
                this.capcity = capcity;
    
            }
    
            //任务放入阻塞队列
            public void put(Task task){
                lock.lock();
                try {
                    while (deque.size() == capcity){
                        try {
                            fullWaitSet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    deque.add(task);
                    emptyWaitSet.signal();
                }finally {
                    lock.unlock();
                }
    
            }
    
            //任务取出阻塞队列
            public Task take(){
                Task task;
                lock.lock();
                try {
                    while (deque.size() == 0){
                        try {
                            emptyWaitSet.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    task= deque.removeFirst();
                    fullWaitSet.signal();
                }finally {
                    lock.unlock();
                }
    
                return task;
            }
    
            //获取阻塞队列当前的size
            public int size(){
                lock.lock();
                try {
                    return deque.size();
                }finally {
                    lock.lock();
                }
    
            }
    
        }
    
        /**
         * 线程池任务
         */
        class Task extends Thread{
            private Runnable task;
    
            public Task(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                while (task!=null||(task=queue.take()) != null){//请细品
                    try {
                        System.out.println("任务"+task+"被执行了");
                        task.run();
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        task = null;
                    }
                }
            }
        }
    }

    测试main方法:

    public static void main(String[] args) {
            HqaPool pool = new HqaPool(2, 10, 1000);
            for (int i = 0; i < 15; i++) {
                pool.execute(() -> {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }

    注意上面的程序当线程逻辑执行完之后程序不会结束,会一直阻塞在task=queue.take()那里,所以我们手写线程池2.0 的版本会给阻塞队列添加超时获取的功能以及拒绝策略等功能。。。。。。。

     

    下面是2.0 版本:

    /**
     * @author 胡庆安
     * @version V2.0
     * @Package Thread.pool
     * @date 2020/10/23 14:01
     * @Copyright © 2010-2020 爬山虎科技(扬州)股份有限公司
     */
    public class ThreadPool {
        Logger log = LoggerFactory.getLogger(ThreadPool.class);
        // 任务队列
        private BlockingQueue<Runnable> taskQueue;
        // 线程集合
        private HashSet<Worker> workers = new HashSet<>();
        // 核心线程数
        private int coreSize;
        // 获取任务时的超时时间
        private long timeout;
        private TimeUnit timeUnit;
    
        // 执行任务
        public void execute(Runnable task) {
            // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
            // 如果任务数超过 coreSize 时,加入任务队列暂存
            synchronized (workers) {
                if (workers.size() < coreSize) {
                    Worker worker = new Worker(task);
                    log.debug("新增 worker{}, {}", worker, task);
                    workers.add(worker);
                    worker.start();
                } else {
                    // taskQueue.put(task);
                    // 1) 死等
                    // 2) 带超时等待
                    // 3) 让调用者放弃任务执行
                    // 4) 让调用者抛出异常
                    // 5) 让调用者自己执行任务
                    taskQueue.offer(task,1000,TimeUnit.SECONDS);
                }
            }
        }
    
        public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.taskQueue = new BlockingQueue<>(queueCapcity);
        }
    
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                // 执行任务
                // 1) 当 task 不为空,执行任务
                // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
                // while(task != null || (task = taskQueue.take()) != null) {
                while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                    try {
                        log.debug("正在执行...{}", task);
                        task.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        task = null;
                    }
                }
                synchronized (workers) {
                    log.debug("worker 被移除{}", this);
                    workers.remove(this);
                }
            }
        }
    
        class BlockingQueue<T> {
            // 1. 任务队列
            private Deque<T> queue = new ArrayDeque<>();
            // 2. 锁
            private ReentrantLock lock = new ReentrantLock();
            // 3. 生产者条件变量
            private Condition fullWaitSet = lock.newCondition();
            // 4. 消费者条件变量
            private Condition emptyWaitSet = lock.newCondition();
            // 5. 容量
            private int capcity;
    
            public BlockingQueue(int capcity) {
                this.capcity = capcity;
            }
    // 带超时阻塞获取
    public T poll(long timeout, TimeUnit unit) {
    lock.lock();
    try {
    // 将 timeout 统一转换为 纳秒
    long nanos = unit.toNanos(timeout);
    while (queue.isEmpty()) {
    try {
    // 返回值是剩余时间
    if (nanos <= 0) {
    return null;
                            }
    nanos = emptyWaitSet.awaitNanos(nanos);
                        } catch (InterruptedException e) {
    e.printStackTrace();
                        }
                    }
    T t = queue.removeFirst();
    fullWaitSet.signal();
    return t;
                } finally {
    lock.unlock();
                }
            }
    // 阻塞获取
    public T take() {
    lock.lock();
    try {
    while (queue.isEmpty()) {
    try {
    emptyWaitSet.await();
                        } catch (InterruptedException e) {
    e.printStackTrace();
                        }
                    }
    T t = queue.removeFirst();
    fullWaitSet.signal();
    return t;
                } finally {
    lock.unlock();
                }
            }
    // 阻塞添加
    public void put(T task) {
    lock.lock();
    try {
    while (queue.size() == capcity) {
    try {
    log.debug("等待加入任务队列 {} ...", task);
    fullWaitSet.await();
                        } catch (InterruptedException e) {
    e.printStackTrace();
                        }
                    }
    log.debug("加入任务队列 {}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
                } finally {
    lock.unlock();
                }
            }
    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    lock.lock();
    try {
    long nanos = timeUnit.toNanos(timeout);
    while (queue.size() == capcity) {
    try {
    if (nanos <= 0) {
    return false;
                            }
    log.debug("等待加入任务队列 {} ...", task);
    nanos = fullWaitSet.awaitNanos(nanos);
                        } catch (InterruptedException e) {
    e.printStackTrace();
                        }
                    }
    log.debug("加入任务队列 {}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    return true;
                } finally {
    lock.unlock();
                }
            }
    public int size() {
    lock.lock();
    try {
    return queue.size();
                } finally {
    lock.unlock();
                }
            }
        }
    }
  • 相关阅读:
    UIProgressView的详细使用
    Android拍照上传代码样例
    UILabel的详细使用及特殊效果
    TextView属性android:ellipsize实现跑马灯效果
    Android中WebView实现Javascript调用Java类方法
    有效获取状态栏(StatusBar)高度
    详解iPhone Tableview分批显示数据
    TextView显示插入的图片
    ObjectiveC语法快速参考
    UISegmentedControl的详细使用
  • 原文地址:https://www.cnblogs.com/huqingan/p/13891175.html
Copyright © 2011-2022 走看看