zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor源码2

    public class ThreadPoolExecutor1 extends AbstractExecutorService1 {
        // 11100000000000000000000000000000 = -536870912,  高3位表示线程池状态, 后29位表示线程个数
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;// 29
        // 2^29-1 = 00011111 11111111 11111111 11111111 = 536870911
        private static final int CAPACITY = (1 << COUNT_BITS) - 1;
        // -1 = 11111111 11111111 11111111 11111111
        // 11100000 00000000 00000000 00000000 = -536870912,运行状态
        private static final int RUNNING = -1 << COUNT_BITS;
        // 00000000000000000000000000000000,关闭状态  
        private static final int SHUTDOWN = 0 << COUNT_BITS;
        // 2^29 = 00100000 00000000 00000000 00000000 = 536870912,停止状态
        private static final int STOP = 1 << COUNT_BITS;
        // 2^30 = 01000000 00000000 00000000 00000000 = 1073741824,整理状态  
        private static final int TIDYING = 2 << COUNT_BITS;
        // 2^29+2^30,3*2^29,2^29+2^29+2^29 = 01100000 00000000 00000000 00000000 = 1610612736,终止状态
        private static final int TERMINATED = 3 << COUNT_BITS;
    
        // C的高3位,高3位表示线程池的运行状态。
        //111RUNNING:运行中,接受新任务处理队列中的任务。
        //000SHUTDOWN:不接收新任务处理队列中的任务; 
        //001STOP:不接收新任务也不处理队列中的任务还中断正在运行的任务;
        //010TIDYING:所有的任务都已经终止;011TERMINATED:terminated()方法已经执行完成 
        private static int runStateOf(int c) {
            return c & ~CAPACITY;//11100000 00000000 00000000 00000000
        }
        
        //C的低29位,跟00011111 11111111 11111111 11111111比较,低29位表示线程池中线程数,最大2^29-1。
        private static int workerCountOf(int c) {//对2^29取余,ctl加了多少次1,最大加2^29-1次。表示多少个worker已经在运行了。
            return c & CAPACITY;//00011111 11111111 11111111 11111111
        }
    
        private static int ctlOf(int rs, int wc) {//rs是状态值,wc是worker线程数量,进行或操作,就是修改状态,但是不改变数量。
            return rs | wc;
        }
    
        private static boolean runStateLessThan(int c, int s) {//ctl是不是小于某个状态值,
            return c < s;
        }
    
        /*ctl从11100000000000000000000000000000=-536870912开始,慢慢加1,一直越来越大,最后=111111111111111111111111=-1
          worker线程数量最大2^29-1=536870911个,就不能再增加了,所以ctl的范围是(-536870912,-1)一直小于0*/
        
        /*SHUTDOWN=0,
          STOP=001=2^29=536870912,
          TIDYING=010=2^30=1073741824
          RUNNING=111=-536870912
          TERMINATED=011=3*2^29=1610612736
          */
        private static boolean runStateAtLeast(int c, int s) {//ctl是不是大于某个状态值
            return c >= s;//RUNNING:111   SHUTDOWN:000    STOP:001    TIDYING:010    TERMINATED:011
        }
    
        private static boolean isRunning(int c) {//RUNNING正常状态,ctl(-536870912,-1)一直小于0,小于0就是正常
            return c < SHUTDOWN;
        }
    
        //ctl+1
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        //ctl-1
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        //ctl-1直到成功。 
        private void decrementWorkerCount() {
            do {
            } while (!compareAndDecrementWorkerCount(ctl.get()));
        }
    
        private final HashSet1<Worker> workers = new HashSet1<Worker>();
        private final BlockingQueue<Runnable> workQueue;//队列。先加到workers里面去,workers满了就加到workQueue里去。
    
        private final ReentrantLock mainLock = new ReentrantLock();//锁住workers
        private final Condition termination = mainLock.newCondition();
    
        //跟踪获得的最大池大小。仅在主锁下访问。
        private int largestPoolSize;
    
        //已完成任务总数
        private long completedTaskCount;
    
        private volatile ThreadFactory threadFactory;
        private volatile RejectedExecutionHandler1 handler;
    
        private volatile long keepAliveTime;//闲置线程存活时间,闲置线程在阻塞等待队列,
        private volatile boolean allowCoreThreadTimeOut;//等待队列时候是否启用keepAliveTime超时
    
        //核心池大小 
        private volatile int corePoolSize;
    
        //最大池大小。 受CAPACITY限制。
        private volatile int maximumPoolSize;
    
        private static final RejectedExecutionHandler1 defaultHandler = new AbortPolicy();
        public static class AbortPolicy implements RejectedExecutionHandler1 {
            public AbortPolicy() {
            }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {//抛出异常,不是什么都不做
                throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
            }
        }
            
        private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
    
        //执行终结器时要使用的上下文,
        private final AccessControlContext acc;
    
        private final class Worker extends AbstractQueuedSynchronizer1 implements Runnable {//是一个AQS和runnable任务,Worker里面的方法只要里面的一个线程访问,
            //这个类永远不会被序列化,但是我们提供了一个serialversionID来禁止javac警告。
            private static final long serialVersionUID = 6138294804551838833L;
            //this worker 正在运行在的Thread。线程池里面创建的线程,不是外部线程。
            final Thread thread;
            //要运行的初始任务。
            Runnable firstTask;
            //这个worker完成的任务
            volatile long completedTasks;
    
            //一个worker就是一个任务。同时是一个AQS队列,同时是一个runnable
            Worker(Runnable firstTask) {
                //初始设置为-1来抑制中断方法的执行。unlock才变为0。只有在runWorker()方法里面先unlock置为0其他方法才能获取锁。
                //interruptIdleWorkers方法来tryLock获取锁来中断时候,是中断不了的。runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义,
                //lock()方法也执行不了,只能先执行unlock()方法,才能去获取锁。
                setState(-1); 
                this.firstTask = firstTask;//executorService1.execute(new Runnable() = firstTask)
                this.thread = getThreadFactory().newThread(this);//this = ThreadPoolExecutor1$Worker,返回new Thread(ThreadPoolExecutor1$Worker)
            }
    
            public void run() {
                try {
                    runWorker(this);//this = ThreadPoolExecutor1$Worker。这个方法在自己的thread里面运行,不会有多线程问题
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            protected boolean isHeldExclusively() {
                return getState() != 0;//true:有人获取锁,false:没人获取锁
            }
            
            //worker类继承自AQS并实现了自己的加锁解锁方法,不可重入的锁。
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {//尝试获取锁
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;//排AQS队
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);//state初始等于-1,
                return true;
            }
    
            public void lock() throws InterruptedException {//会去排队
                acquire(1);
            }
    
            public boolean tryLock() {//不会去排队
                return tryAcquire(1);
            }
    
            public void unlock() {
                release(1);//释放锁,并且唤醒head中的第一个
            }
    
            public boolean isLocked() {
                return isHeldExclusively();
            }
            
            //中断worker关联线程
            void interruptIfStarted() {
                Thread t;
                //state=-1不能中断,说明还没有运行起来
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
        /* 
         1.advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN=00000000000
         第一个条件c>=0000000,说明ctl不是在正常的增加(正常增加都是小于0),c=SHUTDOWN000xxxx|STOP001xxx|TIDYING010xxx|TERMINATED011xxx返回true。就什么都不做。
         c=RUNNING111xxxxx就返回false,第二个条件吧ctl变成000xxxxx
         也就是说:SHUTDOWN000xxxxx|STOP001xxxxx|TIDYING010xxxxx|TERMINATED011xxxxx不能变为shutdown状态,running111xxxxx状态可以变为shutdown状态。
         
         2.advanceRunState(STOP);//更新状态为STOP00100000000
          第一个条件c>=00100000000,说明ctl已经不正常了,c=TIDYING010xxxxx|TERMINATED011xxxxx|STOP001xxxxx,返回true。就什么都不做。
         c=RUNNING111xxxxx|SHUTDWON000xxxxx返回false。第二个条件吧ctl变成001xxxxx
         也就是说:TIDYING010|TERMINATED011|STOP001不能变为STOP状态,running状态|SHUTDOWN可以变为STOP状态。
         */
        //状态是不可逆的,如果跑到前面的状态了,就不动,否则修改。
        private void advanceRunState(int targetState) {//更新状态
            for (;;) {
                int c = ctl.get(); 
                //c>=targetState就不修改状态。c<targetState就去修改状态。
                if (runStateAtLeast(c, targetState) || 
                        //ctlOf(状态值,数量值)进行或操作,就是修改状态,但是不改变数量
                        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    
        //尝试终止线程池
        final void tryTerminate() {
            for (;;) {//死循环
                int c = ctl.get();
                
                //以下两种情况终止线程池,其他情况直接返回:
                //1.状态为stop
                //2.状态为shutdown且任务队列为空
                if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))//SHUTDWON但是队列不空,不停止线程池
                    return;
                
                //false&&false&&(false&&):c>=0&&c<010&&c!=0:c=STOP状态
                //false&&false&&(true&&false):c>=0&&c<010&&c=0&&队列空:c=SHUTDOWN状态队列空
                
                //若线程不为空则中断一个闲置线程后直接返回
                if (workerCountOf(c) != 0) { // worker数量不等于0
                    interruptIdleWorkers(ONLY_ONE);//中断线程,是否只中断一次
                    return;
                }
                
                //worker数量等于0,并且  处于stop状态或者SHUTDOWN状态队列空
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置状态为TIDYING=010
                        try {//线程池终止后做的事情
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));//设置状态为TERMINATED=011
                            termination.signalAll();//唤醒条件队列所有线程
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                //状态设置失败则再重试
            }
        }
        /*
         调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操作,只有状态为stop,
         或者状态为shutdown且任务队列为空这两种情况才能继续执行。
         第二个if语句判断工作者数量是否为0,不为0的话也直接返回。经过这两重判断之后才符合终止线程池的条件,
         于是先通过CAS操作将线程池状态设置为tidying状态,在tidying状态会调用用户自己实现的terminated()方法来做一些处理。
         到了这一步,不管terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。
         最后会唤醒所有等待线程池终止的线程,让它们继续执行
         */
    
    
        private void checkShutdownAccess() {
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkPermission(shutdownPerm);
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    for (Worker w : workers)
                        security.checkAccess(w.thread);//检查每一个线程的权限。线程池的线程全在workers里面的worker里面的thread上面。
                } finally {
                    mainLock.unlock();
                }
            }
        }
    
        //中断所有线程,即使是活动的。忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();//中断workers的线程,加锁防止workers变化。
            } finally {
                mainLock.unlock();
            }
        }
    
        
        private void interruptIdleWorkers(boolean onlyOne) {//中断worker的线程
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {//没有中断并且,获取锁成功,tryLock不会阻塞排队,只会把state=0变成1,
                        //前提是没有线程获取w里面的锁。就可以中断w里面的线程。 其他线程要想操作w就要先获取锁。
                        //worker每处理一个任务,会加锁一次解锁一次。
                        try {
                            t.interrupt();//Worker的线程中断,加锁防止workers变化。
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        //中断空闲worker。中断可以获取锁的worker里面的线程,就是worker w还没有调用lock方法的worker。
        private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
    
        private static final boolean ONLY_ONE = true;
    
    
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
         
        void onShutdown() {
        }
        
    
        //RUNNING返回true,SHUTDOWN并且shutdownOK=true返回true,SHUTDOWN并且shutdownOK=false返回false,
        final boolean isRunningOrShutdown(boolean shutdownOK) {
            int rs = runStateOf(ctl.get());
            return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
        }
    
        
        private List<Runnable> drainQueue() {
            BlockingQueue<Runnable> q = workQueue;
            ArrayList<Runnable> taskList = new ArrayList<Runnable>();
            q.drainTo(taskList);
            if (!q.isEmpty()) {
                for (Runnable r : q.toArray(new Runnable[0])) {
                    if (q.remove(r))//从workqueeu中移除
                        taskList.add(r);//添加到taskList
                }
            }
            return taskList;
        }
        
        /*addWorker本事只是为线程池添加一个Worker,其本身所做的事情其实很简单,但难就难在要确保安全有效得添加一个Worker。
        为此addWorker()方法做了很多额外的工作。比如判断线程池的运行状态,当前Worker数量是否已经饱和等等。可以发现在这个方法,
        或者说整个ThreadPoolExecutor中,很多时候都是使用双重检查的方式来对线程池状态进行检查。其实这都是为了效率,
        最简单不过直接使用Synchronized或ReentranLock进行同步,但这样效率会低很多,所以在这里,
        只有在万不得已的情况下,才会使用悲观的ReentranLock。*/
        
        //添加任务,executorService1.execute(new Runnable() = firstTask),任意个线程并发,
        private boolean addWorker(Runnable firstTask, boolean core) {//core是不是核心线程
            
            //两层循环,外层循环判断线程池状态,状态不符合就return,内层循环判断线程数,线程数超过限定值return。
            
            retry: for (;;) {
                int c = ctl.get(); 
                int rs = runStateOf(c);//前3位不变,后面29位=0。
                //c处于RUNNING状态,c=111xxxxxxxxx,rs=111 0000000000000000
                //c处于SHUTDOWN状态,c=000xxxxxxxxx,rs=000 0000000000000000
                //c处于STOP状态,c=001xxxxxxxxx,rs=001 0000000000000000
                //c处于TIDYING状态,c=010xxxxxxxxx,rs=010 0000000000000000
                //c处于TERMINATED状态,c=011xxxxxxxxx,rs=011 0000000000000000
                
                //状态判断。rs后面全是0,ctl后面不是0。
    
                //rs正常运行小于0,SHUTDOWN=0,其他STOP,TIDYING,TERMINATED都是大于0
                
                //true&&!(false):rs>=0(不是RUNNING状态)&& rs != SHUTDOWN=000:rs>0处于STOP,TIDYING,TERMINATED,就不新开worker线程了//true&&!(true&&false):rs>=0&&rs=0&&firstTask!=null:SHUTDOWN状态&&有第一个任务,
                //true&&!(true&&true&&false):rs>=0&&rs=0&&firstTask=null&&workQueue=null:SHUTDOWN状态&&没有第一个任务&&队列空,任务没有队列也空了就不用新开worker线程了,
                if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                    return false;
    
                //只有以下两种情况会继续添加线程
                //1.false:rs<0(处于RUNNING状态),
                //2.true&&!(true&&true&&true):rs>=0&&rs=0&&firstTask=null&&workQueue!=null:shutdown状态,首任务空,队列还有任务。其他地方调用addWorker(null,true|false),队列还有任务要开worker线程。
                for (;;) {
                    int wc = workerCountOf(c);//已经运行的worker线程,worker线程数目大于CAPACITY=2^29-1就不嗯能够再加worker线程数目了。
                    
                    //以下三种情况不继续添加线程:
                    //1.线程数大于线程池总容量
                    //2.当前线程为核心线程,且核心线程数达到corePoolSize
                    //3.当前线程非核心线程,且总线程达到maximumPoolSize
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;//不开worker线程,加到队列里面去,
                    if (compareAndIncrementWorkerCount(c))//ctl加一成功
                        break retry;//加1成功就退出
                    c = ctl.get(); //ctl加一失败,
                    if (runStateOf(c) != rs)//状态没变重新加1。
                        continue retry;//状态变了重新获取状态。
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //executorService1.execute(new Runnable() = firstTask),Worker里面有一个firstTask和Thread,Thread里面有这个Worker w。
                w = new Worker(firstTask);//firstTask可以为null
                //Worker里面封装了这个任务,并且实例化了一个线程,总共3个Worker,3个任务(3个第一个任务),3个线程。线程run时候就开一个线程去执行Worker里的第一个任务,线程Thread run的时候,是Thread
                //里面的Runnable去run,所以Worker要设置成这个Thread的Runnable,然后让Worker去run(转调外部类的run方法,把worker传进去)。Worker里面仅仅保存的是这个worker的第一个任务,第一个任务执行完会死循环执行queue队列的任务。
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();//单线程进来,其余阻塞
                    try {
                        // 保持锁止时重新检查。如果线程发生故障或在获取锁之前关闭,请退出。
                        int rs = runStateOf(ctl.get());
                        //true|:running状态,增加worker
                        //false|true:shutdown状态并且task=null,增加worker
                        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) //如果线程已经开启则抛出异常
                                throw new IllegalThreadStateException();
                            workers.add(w);//Worker加到workers里面去,多线程并发要加锁
                            int s = workers.size();
                            if (s > largestPoolSize)//记录线程达到的最大值
                                largestPoolSize = s;
                            workerAdded = true;//添加成功
                        }
                        //false|(fasle|):STOP|TIDYING|TERMINATER:不增加worker
                        //false|(true|false):SHUTDOWN,task不为null:不增加worker
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {//添加成功
                        t.start();//线程123执行start(),就会去执行Worker的run方法。
                        workerStarted = true;//启动成功
                    }
                }
            } finally {
                if (!workerStarted)//线程启动成功
                    addWorkerFailed(w);//新建worker失败
            }
            return workerStarted;//是否启动成功
        }
    
        //新建worker失败
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;//加锁
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);//workers中移除
                decrementWorkerCount();//减少ctl
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
        //工作线程如果从getTask方法中获得null,则会退出while循环并随后执行processWorkerExit方法。移除自己。completedAbruptly=fasle没有异常,
        //该方法会在这个工作线程终止之前执行一些操作:统计该工作者完成的任务数,然后将其从workers集合中删除,每删除一个工作者之后都会去调用tryTerminate方法尝试终止线程池,但并不一定会真的终止线程池。
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) //若非正常完成则将线程数减为0
                decrementWorkerCount();//ctl减1
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;//统计完成的任务总数
                workers.remove(w);//移除work
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();//尝试终止线程池
            /*
              从tryTerminate方法返回后再次去检查一遍线程池的状态,如果线程池状态为running或者shutdown,
              并且线程数小于最小值,则恢复一个工作者。这个最小值是怎样计算出来的呢?
              我们来看看。如果allowCoreThreadTimeOut为true则最小值为0,否则最小值为corePoolSize。
              但还有一个例外情况,就是虽然允许核心线程超时了,但是如果任务队列不为空的话,那么必须保证有一个线程存在,因此这时最小值设为1
              后面就是判断如果工作线程数大于最小值就不新增线程了,否则就新增一个非核心线程。
              从这个方法可以看到,每个线程退出时都会去判断要不要再恢复一个线程,因此线程池中的线程总数也是动态增减的。
             */
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {//SHUTDOWN|RUNNING,则将线程数恢复到最小值
                if (!completedAbruptly) {//线程正常完成任务被移除
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//允许核心线程超时最小值为0, 否则最小值为核心线程数
                    if (min == 0 && !workQueue.isEmpty())//如果任务队列还有任务, 则保证至少有一个线程
                        min = 1;
                    if (workerCountOf(c) >= min)//若线程数大于最小值则不新增了
                        return;  
                }
                addWorker(null, false);//新增工作线程
            }
        }
    
        private Runnable getTask() {//不是worker的方法,corePoolSize个多线程并发访问,
            boolean timedOut = false; //上一次获取任务是否超时
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);//rs是一个值,ctl是多个值。
                
                //if判断,从这里我们可以看到,如果线程池状态为shutdown,会继续消费任务队列里面的任务;如果线程池状态为stop,则停止消费任务队列里剩余的任务。
                
                //true&&(true|): rs=STOP|TIDYIN|TERMINATED
                //true&&(false|true): rs=SHUTDOWN并且队列空。SHUTDOWN了还会去执行队列。
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();//ctl-1,销毁当前线程
                    return null;//这个线程  退出这个for循环和外部的while循环,这个worker线程退出终止,执行完。
                }
    
                //false:rs=RUNNING:
                //true&&(false|false):rs=SHUTDOWN并且队列不为空:每个线程都不会终止,继续处理队列的任务。
                int wc = workerCountOf(c);
    
                // 是否开始超时等待:1.允许核心线程超时,2.线程数大于corePoolSize
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                //是否退出当前线程,
                if (    ( wc > maximumPoolSize || (timed && timedOut) ) 
                        && ( wc > 1 || workQueue.isEmpty() )) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;//终止当前线程
                    continue;
                }
    
                //不退出当前线程:
                try {//会阻塞等到队列有任务,超时等待返回空,继续死循环。
                    //注意:闲置线程会一直在这阻塞
                    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true; //是因为超时退出阻塞的,
                } catch (InterruptedException retry) {
                    timedOut = false;//不是因为超时退出阻塞的,
                }
            }
        }
    
        final void runWorker(Worker w) throws InterruptedException {//不是worker的方法,是外部类的方法,会有corePoolSize个多线程访问问题,
            Thread wt = Thread.currentThread();
            Thread t = w.thread;//相等的
            
            Runnable task = w.firstTask;//第一个任务,不是队列的任务
            w.firstTask = null;
            //设置work的state=0和去掉owenerThread属性
            //Worker也是一个ReentantLock,但是3个线程,每个线程一个woker,woker w不是线程共享的锁,不会多线程获取这把锁,unlock()不会有多线程访问,
            w.unlock(); //把state由-1变成0。interruptIdleWorkers方法就可以中断这个线程了。
            boolean completedAbruptly = true;//有异常
            try {
                //先执行初始是一个任务task,执行完之后从workQueue中取任务去执行。
                while (task != null || (task = getTask()) != null) {//不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。
                    //就有多线程调用w.lock(),每个线程一个woker,woker w不是线程共享的锁,此处代码不会多线程获取这把锁,
                    //设置work的state=1和owenerThread属性
                    //如果shutdown方法里面的interrupt方法,调用了w.tryLock(),那么当前线程就会加入到w的队列,并且当前线程阻塞等待唤醒,唤醒之后继续这里执行。
                    //每次都使用锁以保证当前worker在运行task过程中不会被中断。
                    w.lock();
                     
                    //其他线程可以从workers中获取一个worker,其他线程没有获取到锁,就不能对这个线程中断。本线程的下面执行就不能被中断。
                    
                    //ctl大于等于STOP:STOP|TIDYING|TERMINATED 
                    if (   (       runStateAtLeast(ctl.get(), STOP) 
                               || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
                           )
                           && !wt.isInterrupted()
                       )
                        wt.interrupt();//线程中断
                    try {
                        beforeExecute(wt, task);//正常执行,就继续运行。
                        Throwable thrown = null;
                        try {
                            task.run();//外部添加的任务run
                        } catch (RuntimeException x) {
                            thrown = x;//收集异常,给afterExecute
                            throw x;
                        } catch (Error x) {
                            thrown = x;//收集异常,给afterExecute
                            throw x;
                        } catch (Throwable x) {
                            thrown = x;//收集异常,给afterExecute
                            throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);//异不异常都会处理,thrown来区分。afterexecute这也可能引发一个异常,
                        }
                    } finally {
                        task = null;//将执行完的任务置空 
                        w.completedTasks++;//将完成的任务数加一
                        w.unlock();
                    }
                }
                completedAbruptly = false;//正常完成,没有异常
            } finally {
                processWorkerExit(w, completedAbruptly);//异不异常都会处理,completedAbruptly来区分。移除自己。
            }
        }
        
        public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors1.defaultThreadFactory(),
                    defaultHandler);
        }
        
        public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
        }
        
        public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, RejectedExecutionHandler1 handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
        }
        
        public ThreadPoolExecutor1(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler1 handler) {
            if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
        //这个任务有可能新建线程执行,有可能在已经存在的线程里面去执行。线程池已经shutdown了或者池子满了就丢弃任务,
        public void execute(Runnable command) {
            //executorService1.execute(new Runnable() = firstTask),会有多线程访问。任意多个线程调用同一个线程池executorService.execute方法。
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get(); 
            int m;
            /*于corepoolsize就新建线程,大于sorepoolsize就加入队列,队列满了但是小于maxsize就新建线程,大于maxsize就丢弃任务。*/
            //最大开corePoolSize=3个线程,来一个新建一个Worker添加到workers里面去然后Worker.start(),最多添加3个。workers变成3个之后不会较小,一直不变。
            //后面进来的添加到workQueue里面去,前面3个线程只有执行完了就会去阻塞等到队列有任务。前面3个线程一直不会退出。
            
            if ( (m = workerCountOf(c)) < corePoolSize) {//任意多个线程可以并发访问。当前 运行的线程 数量是否少于corePoolSize。
                if (addWorker(command, true))//添加到workers里面去,然后start。创建一个新的工作线程来执行任务。
                    return;
                c = ctl.get();
            }
            
            //上面代码任意个线程并发,下面代码超过corePoolSize的线程并发。若队列已满则返回false。
            if (isRunning(c) && workQueue.offer(command)) {//c<0(运行状态)并且任务加到队列成功,3个核心的线程刚才在阻塞等待workQueue现在队列有元素了,会唤醒去处理
                
                //在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。
                int recheck = ctl.get();//这里进行再次检查状态,
                
                //!true:还是处于正常运行状态:不移除, 
                //!fasle:不处于运行态,queue中移除command并且tryTerminate()成功移除后再执行拒绝策略
                
                if (!isRunning(recheck) && remove(command))
                    reject(command);//丢弃任务,如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。
                
                //处于运行状态,task添加到队列中,没有worker线程,
                //若线程数为0则新建一个worker。稍后这个空闲的worker就会自动去队列里面取任务来执行
                //如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);//开一个线程但是不执行初始任务,等着执行队列任务
                
            //不是运行状态,或者队列添加失败,拒绝策略拒绝task
            //如果任务队列已经满了,此时offer方法会返回false,添加到队列失败了,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果这个线程创建失败,则最后会执行拒绝策略
            } else if (!addWorker(command, false))
                reject(command); 
        }
    
        //平缓关闭线程池
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;//获取锁,新建不了worker
            mainLock.lock();
            try {
                checkShutdownAccess();//检查是否有关闭的权限
                advanceRunState(SHUTDOWN);//更新状态为SHUTDOWN,这时线程池会拒绝接收外部传过来的任务,
                interruptIdleWorkers();//中断闲置的线程,还没有执行lock的worker w的线程,剩余的线程会继续消费完任务队列里的任务之后才会终止。
                onShutdown();  //对外提供的钩子
            } finally {
                mainLock.unlock();
            }
            tryTerminate();//尝试终止线程池
        }
    
        //立刻关闭线程池
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();//检查是否有关闭的权限
                advanceRunState(STOP);//更新状态为stop,这是线程池也不再接收外界的任务
                interruptWorkers();//中断所有工作线程
                tasks = drainQueue();//排干任务队列
            } finally {
                mainLock.unlock();
            }
            tryTerminate();//尝试终止线程池
            return tasks;//最后返回未被处理的任务集合。
        }
        //调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。
        
        public boolean isShutdown() {
            return !isRunning(ctl.get());
        }
    
        //ctl>=0并且小于0110000000000,ctl=SHUTDOWN|STOP|TIDYING
        public boolean isTerminating() {
            int c = ctl.get();
            return !isRunning(c) && runStateLessThan(c, TERMINATED);
        }
    
        //ctl>=TERMINATED
        public boolean isTerminated() {
            return runStateAtLeast(ctl.get(), TERMINATED);
        }
    
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                    if (runStateAtLeast(ctl.get(), TERMINATED))//状态>=011,表示已经处于TERMINATED
                        return true;
                    if (nanos <= 0)
                        return false;
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
        protected void finalize() {
            SecurityManager sm = System.getSecurityManager();
            if (sm == null || acc == null) {
                shutdown();
            } else {
                PrivilegedAction<Void> pa = () -> {
                    shutdown();
                    return null;
                };
                AccessController.doPrivileged(pa, acc);
            }
        }
    
        public void setThreadFactory(ThreadFactory threadFactory) {
            if (threadFactory == null)
                throw new NullPointerException();
            this.threadFactory = threadFactory;
        }
    
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
        public void setRejectedExecutionHandler(RejectedExecutionHandler1 handler) {
            if (handler == null)
                throw new NullPointerException();
            this.handler = handler;
        }
    
        public RejectedExecutionHandler1 getRejectedExecutionHandler() {
            return handler;
        }
    
        public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();//中断所有可以获取锁的worker
            else if (delta > 0) {
                //我们不知道“需要”多少新线程。作为一种启发式方法,预启动足够多的新工作人员(最多新的核心大小)
                //来处理队列中当前的任务数,但如果队列在执行此操作时变为空,则停止。
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {//开k个worker线程
                    if (workQueue.isEmpty())//队列空就停止
                        break;
                }
            }
        }
    
        public int getCorePoolSize() {
            return corePoolSize;
        }
    
        //worker线程数量小于corePoolSize。就开一个空初始任务的Worker和线程。
        public boolean prestartCoreThread() {
            return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
        }
    
        //与pretartcorethread相同,只是安排至少启动一个线程,即使corepoolsize为0。
        void ensurePrestart() {
            int wc = workerCountOf(ctl.get());
            if (wc < corePoolSize)
                addWorker(null, true);
            else if (wc == 0)
                addWorker(null, false);
        }
    
        //启动3个worker线程去执行,有任务进来,就不初始化线程,直接都给队列,然后这些线程从队列取任务。
        //饿初始化线程池的线程,不是懒初始化,等到任务来了才初始化线程。
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
    
        public boolean allowsCoreThreadTimeOut() {
            return allowCoreThreadTimeOut;
        }
    
        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            if (value != allowCoreThreadTimeOut) {
                allowCoreThreadTimeOut = value;
                if (value)//true,启用超时
                    interruptIdleWorkers();
            }
        }
    
        public void setMaximumPoolSize(int maximumPoolSize) {
            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
                throw new IllegalArgumentException();
            this.maximumPoolSize = maximumPoolSize;
            //已经开的worker数量大于maximumPoolSize就中断所有可以获取锁的worker的线程
            if (workerCountOf(ctl.get()) > maximumPoolSize)
                interruptIdleWorkers();
        }
    
        public int getMaximumPoolSize() {
            return maximumPoolSize;
        }
    
        public void setKeepAliveTime(long time, TimeUnit unit) {
            if (time < 0)
                throw new IllegalArgumentException();
            if (time == 0 && allowsCoreThreadTimeOut())
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            long keepAliveTime = unit.toNanos(time);
            long delta = keepAliveTime - this.keepAliveTime;
            this.keepAliveTime = keepAliveTime;
            if (delta < 0)
                interruptIdleWorkers();
        }
    
        public long getKeepAliveTime(TimeUnit unit) {
            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
        }
    
        public BlockingQueue<Runnable> getQueue() {
            return workQueue;
        }
    
        public boolean remove(Runnable task) {
            boolean removed = workQueue.remove(task);//workQueue队列中移除task
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        /**
        尝试从工作队列中删除所有已取消的@link future任务。
        此方法可作为存储回收操作使用,对功能没有其他影响。
        被取消的任务永远不会执行,但可能会累积到工作队列中,直到工作线程可以主动删除它们。
        调用此方法将尝试立即删除它们。但是,此方法可能无法在存在其他线程干扰的情况下删除任务。
         */
        public void purge() {
            final BlockingQueue<Runnable> q = workQueue;
            try {
                Iterator<Runnable> it = q.iterator();
                while (it.hasNext()) {
                    Runnable r = it.next();
                    if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
                        it.remove();
                }
            } catch (ConcurrentModificationException fallThrough) {
                // Take slow path if we encounter interference during traversal.
                // Make copy for traversal and call remove for cancelled entries.
                // The slow path is more likely to be O(N*N).
                for (Object r : q.toArray())
                    if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
                        q.remove(r);
            }
    
            tryTerminate(); // In case SHUTDOWN and now empty
        }
    
        public int getPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Remove rare and surprising possibility of
                // isTerminated() && getPoolSize() > 0
                return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        public int getActiveCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int n = 0;
                for (Worker w : workers)
                    if (w.isLocked())
                        ++n;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        public int getLargestPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                return largestPoolSize;
            } finally {
                mainLock.unlock();
            }
        }
    
        public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }
    
        //返回已完成执行的任务的大致总数。 近似值,在连续调用中不会减少。
        public long getCompletedTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers)
                    n += w.completedTasks;
                return n;
            } finally {
                mainLock.unlock();
            }
        }
    
        public String toString() {
            long ncompleted;
            int nworkers, nactive;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                ncompleted = completedTaskCount;
                nactive = 0;
                nworkers = workers.size();
                for (Worker w : workers) {
                    ncompleted += w.completedTasks;
                    if (w.isLocked())
                        ++nactive;
                }
            } finally {
                mainLock.unlock();
            }
            int c = ctl.get();
            String rs = (runStateLessThan(c, SHUTDOWN) ? "Running"
                    : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down"));
            return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive
                    + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]";
        }
    
        protected void beforeExecute(Thread t, Runnable r) {}
    
        protected void afterExecute(Runnable r, Throwable t) {}
    
        protected void terminated() {}// Executor中断调用 
    
        public static class CallerRunsPolicy implements RejectedExecutionHandler1 {
            public CallerRunsPolicy() {
            }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
                if (!e.isShutdown()) {//池子关闭了,就丢弃这个任务,什么都不执行就是丢弃。
                    r.run();//池子没有关闭,在调用者线程执行这个任务,不再使用线程池的线程来执行任务。
                }
            }
        }
    
        public static class DiscardPolicy implements RejectedExecutionHandler1 {
            public DiscardPolicy() {
            }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
            }//什么都不执行就是丢弃任务
        }
    
        public static class DiscardOldestPolicy implements RejectedExecutionHandler1 {
            public DiscardOldestPolicy() {
            }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor1 e) {
                if (!e.isShutdown()) {//池子没有关闭
                    e.getQueue().poll();//移除队列中的第一个元素,丢弃阻塞队列中靠最前的任务
                    e.execute(r);//把这个任务丢进去
                }//池子关闭了什么都不做丢弃
            }
        }
    }
    public class ThreadPoolExample1 {
       
        public static void main(String[] args) {
            
            ThreadPoolExecutor1 executorService1 = new ThreadPoolExecutor1(3, 3, 6L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            
    //        executorService1.prestartAllCoreThreads();
            
    //        ThreadPoolExecutor1 executorService1 = (ThreadPoolExecutor1) Executors1.newCachedThreadPool();
    //      ExecutorService executorService2 = Executors1.newSingleThreadExecutor();
            
            new Thread(() -> {//外部线程,不是线程池线程,丢任务给池子。
                int i = 0;
                while (true) {
                    Threadd w = new Threadd(executorService1,"子线程"+(++i));//外部线程,不是线程池线程,丢任务给池子。
                    w.start();
                }
            },"发射线程").start();
            
            executorService1.shutdown();
        }
        static int j =0; 
        static class Threadd extends Thread {//外部线程,不是线程池线程
            ThreadPoolExecutor1 executorService1;
            Threadd(ThreadPoolExecutor1 executorService1,String name) {
                super(name);
                this.executorService1 = executorService1;
            }
            @Override
            public void run() {//多个线程(不是main线程)来调用executorService.execute()来丢任务给池子。
                try {
                    executorService1.execute(new Runnable() {//execute的参数是一个任务
                        String name = "任务"+(j++);
                        @Override
                        public void run() {
                            System.out.println(name+"完成");
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    Log4j的配置
    Linux笔记
    面对一个个路口
    切图布局知识点(四)——不定宽有背景文字块居中
    切图布局知识点(三)——左右布局
    切图布局知识点(二)——高度100%
    切图布局知识点(一)
    window下静默执行python脚本
    mysql 允许远程连接
    linux 防火墙
  • 原文地址:https://www.cnblogs.com/yaowen/p/11378173.html
Copyright © 2011-2022 走看看