zoukankan      html  css  js  c++  java
  • SimpleThreadPool给线程池增加自动扩充线程数量,以及闲时自动回收的功能

    给线程池增加自动扩充线程数量,以及闲时自动回收的功能

    package com.dwz.concurrency.chapter13;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.List;
    
    public class SimpleThreadPool4 extends Thread {
        private static int size;
        private final int queueSize;
        private final static int DEFAULT_TASK_QUEUE_SIZE = 2000;
        private static volatile int seq = 0;
        private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
        private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
        private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
        private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
        private final DiscardPolicy discardPolicy;
        private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
            throw new DiscardException("Discard this task.");
        };
        private volatile boolean destroy = false;
        private int min;
        private int max;
        private int active;
    
        public SimpleThreadPool4() {
            this(4, 8, 12, DEFAULT_TASK_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
        }
    
        public SimpleThreadPool4(int min, int active, int max, int queueSize, DiscardPolicy discardPolicy) {
            this.min = min;
            this.active = active;
            this.max = max;
            this.queueSize = queueSize;
            this.discardPolicy = discardPolicy;
            init();
        }
    
        private void init() {
            for (int i = 0; i < this.min; i++) {
                createWorkTask();
            }
            this.size = min;
            this.start();
        }
    
        public void submit(Runnable runnable) {
            if (destroy) {
                throw new IllegalStateException("The thread pool already destroy and not allow submit task.");
            }
            synchronized (TASK_QUEUE) {
                if (TASK_QUEUE.size() >= this.queueSize) {
                    discardPolicy.discard();
                }
                TASK_QUEUE.addLast(runnable);
                TASK_QUEUE.notifyAll();
            }
        }
    
        @Override
        public void run() {
            while (!destroy) {
                System.out.printf("Pool#Min:%d,Active:%d,Max:%d,Current:%d,QueueSize:%d,ThreadQueueSize::%d
    ", 
                        this.min, this.active, this.max, this.size, TASK_QUEUE.size(), THREAD_QUEUE.size());
                try {
                    Thread.sleep(5_000L);
                    if(TASK_QUEUE.size() > this.active && this.size < this.active) {
                        for(int i = this.size; i < this.active; i++) {
                            createWorkTask();
                        }
                        System.err.printf("The pool incremented to active. currentSize:%d
    ", this.size);
                        this.size = this.active;
                    } else if(TASK_QUEUE.size() > this.max && this.size < this.max) {
                        for(int i = this.size; i < this.max; i++) {
                            createWorkTask();
                        }
                        System.err.printf("The pool incremented to max currentSize:%d
    ", this.size);
                        this.size = this.max;
                    }
                    
                    if(TASK_QUEUE.isEmpty() && this.size > this.active) {
                        System.out.println("===============Reduce===============");
                        synchronized (THREAD_QUEUE) {
                            int releaseSize = this.size - this.active;
                            for(Iterator<WorkerTask> it = THREAD_QUEUE.iterator(); it.hasNext();) {
                                if(releaseSize <= 0) {
                                    break;
                                }
                                WorkerTask task = it.next();
                                task.close();
                                task.interrupt();
                                it.remove();
                                releaseSize--;
                            }
                            this.size = this.active;
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void createWorkTask() {
            WorkerTask task = new WorkerTask(GROUP, THREAD_PREFIX + (seq++));
            task.start();
            THREAD_QUEUE.add(task);
        }
    
        public void shutdown() throws InterruptedException {
            while (!TASK_QUEUE.isEmpty()) {
                Thread.sleep(50);
            }
            synchronized (THREAD_QUEUE) {
                int initVal = THREAD_QUEUE.size();
                while (initVal > 0) {
                    for (WorkerTask task : THREAD_QUEUE) {
                        if (task.getTaskState() == TaskState.BLOCKED) {
                            task.interrupt();
                            task.close();
                            initVal--;
                        } else {
                            Thread.sleep(10);
                        }
                    }
                }
            }
            this.destroy = true;
            System.out.println("The thread pool disposed.");
            System.err.println("THREAD_QUEUE size is: " + THREAD_QUEUE.size());
            System.out.println("GROUP.activeCount() is: " + GROUP.activeCount());
        }
    
        public int getSize() {
            return size;
        }
    
        public int getQueueSize() {
            return queueSize;
        }
        
        public int getMin() {
            return min;
        }
    
        public int getMax() {
            return max;
        }
    
        public int getActive() {
            return active;
        }
    
        public boolean isDestroy() {
            return destroy;
        }
    
        private enum TaskState {
            FREE, RUNNING, BLOCKED, DEAD
        }
    
        public static class DiscardException extends RuntimeException {
    
            public DiscardException(String message) {
                super(message);
            }
        }
    
        public interface DiscardPolicy {
            void discard() throws DiscardException;
        }
    
        private static class WorkerTask extends Thread {
            private volatile TaskState taskState = TaskState.FREE;
    
            public WorkerTask(ThreadGroup group, String name) {
                super(group, name);
            }
    
            public TaskState getTaskState() {
                return this.taskState;
            }
    
            @Override
            public void run() {
                OUTER: while (this.taskState != TaskState.DEAD) {
                    Runnable runnable = null;
                    synchronized (TASK_QUEUE) {
                        while (TASK_QUEUE.isEmpty()) {
                            try {
                                this.taskState = TaskState.BLOCKED;
                                TASK_QUEUE.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                                System.err.println("closed.");
                                // 线程被打断回到OUTER位置
                                break OUTER;
                            }
                        }
                        runnable = TASK_QUEUE.removeFirst();
                    }
    
                    if (runnable != null) {
                        System.out.println("runnable into...");
                        this.taskState = TaskState.RUNNING;
                        runnable.run();
                        this.taskState = TaskState.FREE;
                    }
                }
            }
    
            public void close() {
                this.taskState = TaskState.DEAD;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            SimpleThreadPool4 threadPool = new SimpleThreadPool4();
            for (int i = 0; i < 100; i++) {
                threadPool.submit(() -> {
                    System.out.println("The runnable be serviced by " + Thread.currentThread() + " start.");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("The runnable be serviced by " + Thread.currentThread() + " finished.");
                });
            }
        }
    }
  • 相关阅读:
    验证码学习笔记
    字符串String类
    密封关键字sealed
    比较两个对象是否为同一个对象
    Unity让带有Rigidbody组件的游戏对象停止运动
    Unity3d-制作粒子光环特效
    Unity3d实现的十字路口的模拟(三)
    kinect v2
    Unity3D 之射线检测
    FFmpeg X264的preset和tune
  • 原文地址:https://www.cnblogs.com/zheaven/p/12090939.html
Copyright © 2011-2022 走看看