zoukankan      html  css  js  c++  java
  • 史前最详细的:Java线程池核心流程解析

    1. 前言

    线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。

    2. 灵魂拷问:怎么做到线程重复利用?

    很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。

    如何用代码实现呢?

    此时就会发现,调用线程的start方法后,生命周期就不由父线程直接控制了。线程的run方法执行完成就销毁了,所谓的“取出”和“放回”只不过是想当然的操作。

    这里先说答案:生产者消费者模型

    3. ThreadPoolExecutor的实现

     

    3.1 结构

    首先看下ThreadPoolExecutor的继承结构

    顶级接口是Executor,定义execute方法

    ExecutorService添加了submit方法,支持返回future获取执行结果,以及线程池运行状态的相关方法

    本文着重讲线程池的执行流程,因此将暂时忽略线程池的状态相关的代码,也建议新手看源码时从核心流程看起。

    3.2 核心方法:execute()

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 判断是否小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
                //添加worker,添加成功则退出
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            // 核心线程数已用完则放入队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 双重检查,避免入队完成后,所有线程已销毁,导致没有消费者消费当前任务
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 队列已满则开启非核心线程,达到最大线程数则使用拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }

    execute方法就是一个生产的过程,主要分为开启线程和入队

    开启线程会传入command(即当前任务),开启的线程会立即消费该任务

    入队的任务则会由Worker消费

    主要关注corePoolSize,maximumPoolSize,queueSize(任务队列长度),workerCount(当前worker数量)这几个参数,可以总结为以下:

    3.2 消费者:Worker

     

    Worker类实现Runnable接口,继承AQS,主要先关注thread和firstTask两个属性和run方法

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    从Worker的构造方法可以看出,thread就是线程池中真正消费任务的线程,创建时会传入this(即Worker对象),而Worker实现了Runnable,因此线程运行时就是执行了Worker的run方法。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 重点关注
            while (task != null || (task = getTask()) != null) {
                // ···
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 重点关注
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 重点关注
                    task = null;
                    // ···
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

    再来看run方法,直接调用了ThreadPoolExecutor的runWorker方法,runWorker中有一个while循环,循环体执行了task.run方法

    task首先会获取Worker中的firstTask属性,并将其置为null,因此firstTask只会执行一次,后续task将通过getTask方法获取。

    因此Worker的工作流程可以概括为:消费完Worker的firstTask后,循环执行getTask获取任务并消费,获取不到task时,就退出循环,线程销毁。

    此处便可以看出生产者消费者模型了。

    private Runnable getTask() {
        boolean timedOut = false;
        for (;;) {
            int c = ctl.get();
            // ···
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    // 此处返回null,runWorker将退出循环
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    runWorker方法退出循环的条件是getTask返回null。

    观察getTask,只有同时满足以下情况时才会返回null

    返回的task是通过workQueue.poll和workQueue.take得到的

    两者执行时线程均会挂起,直至workQueue中有新的任务

    不同之处在于poll方法阻塞keepAliveTime时间后会自动唤醒并返回null,此时timeOut置为true,即满足条件1

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // ···
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                // ···
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // ···
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // ···
        }
        return workerStarted;
    }

    了解了Worker之后,再来看execute中调用的addWorker方法

    方法有两个参数,firstTask即为Worker的firstTask,core则标记需要新增的是否是核心线程

    retry循环与线程池状态相关,内层for循环则是重复尝试cas增加线程,若大于corePoolSize或者maximumPoolSize则新增失败,cas成功后,new一个Worker并start

    3.3 总结

    回到最初的问题,线程是如何做到重复利用的?

    并不存在取出线程使用完再归还的操作,线程启动后进入循环,主动获取任务执行,退出循环则线程销毁。

    execute方法控制新增Worker和任务入队

    附:手写简易线程池

    public class MyThreadPool implements Executor {
    
        private int corePoolSize;
        private int maximumPoolSize;
        private BlockingQueue<Runnable> queue;
        //记录当前工作线程数
        private AtomicInteger count;
        private long keepAliveTime;
        private RejectHandler rejectHandler;
    
        public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> queue, long keepAliveTime, RejectHandler rejectHandler) {
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.queue = queue;
            this.keepAliveTime = keepAliveTime;
            this.rejectHandler = rejectHandler;
            count = new AtomicInteger(0);
        }
    
        @Override
        public void execute(Runnable task) {
            int ct = count.get();
            //核心线程数未满,尝试增加核心线程
            if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {
                new Worker(task).start();
                return;
            }
            //入队
            if (queue.offer(task)) {
                return;
            }
            //队列已满,尝试增加非核心线程
            if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {
                new Worker(task).start();
                return;
            }
            //已达最大线程数,拒绝
            rejectHandler.reject(task);
    
        }
    
        class Worker extends Thread {
    
            Runnable firstTask;
    
            public Worker(Runnable firstTask) {
                this.firstTask = firstTask;
            }
    
            @Override
            public void run() {
                Runnable task = firstTask;
                firstTask = null;
                while (true) {
                    try {
                        //getTask会阻塞
                        if (task != null || (task = getTask()) != null) {
                            task.run();
                        } else {
                            //getTask超时才会进入,直接退出,线程销毁
                            break;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //置空,否则不能getTask
                        task = null;
                    }
                }
            }
    
        }
    
        Runnable getTask() throws InterruptedException {
            //标记是否超时过
            boolean timedOut = false;
            while (true) {
                int ct = count.get();
                //超出核心线程数才进入超时逻辑,即使timeOut由于线程poll超时过一次变成true,执行到这里如果不超出corePoolSize,可以再次进入take分支
                if (ct > corePoolSize) {
                    //超出核心线程数
                    if (timedOut) {
                        //已超时过,尝试减少工作线程数,失败会continue,然后重新比较corePoolSize,重试减少线程数
                        if (count.compareAndSet(ct, ct - 1)) {
                            return null;
                        } else {
                            continue;
                        }
                    }
                    Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
                    if (task == null) {
                        //poll超时才进入
                        timedOut = true;
                        continue;
                    }
                    return task;
                } else {
                    //必然能获取到task
                    return queue.take();
                }
            }
        }
    
        public static interface RejectHandler {
    
            void reject(Runnable r);
    
        }
    
    
        public static void main(String[] args) {
            MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {
                System.out.println(r + ": reject");
            });
            for (int i = 0; i < 3; i++) {
                final int x = i;
                new Thread(() -> {
                    for (int j = 0; j < 5; j++) {
                        final int y = j;
                        pool.execute(() -> {
                            try {
                                Thread.sleep(3000L);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            LocalDateTime now = LocalDateTime.now();
                            System.out.println(String.format("线程i=%s, j=%s,执行结束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));
                        });
                    }
                }).start();
    
            }
    
        }
    
    }

    干货来袭!阿里内部Java面试必备软/硬实力+大厂面经全都有

    有完整的Java初级,高级对应的学习路线和资料!专注于java开发。分享java基础、原理性知识、JavaWeb实战、spring全家桶、设计模式、分布式及面试资料、开源项目,助力开发者成长!


    欢迎关注微信公众号:码邦主

     

  • 相关阅读:
    用小程序&#183;云开发两天搭建mini论坛丨实战
    巧用小程序&#183;云开发实现邮件发送功能丨实战
    借助云开发,10行代码实现小程序支付功能!丨实战
    你的心事我全知晓——心情日记小程序丨实战
    只需20小时,让0基础的你掌握小程序云开发!这个暑假,约否?
    诗词歌赋,样样精通!诗词古语小程序带你领略魅力古风丨实战
    在线教育项目-day02【后台讲师管理模块】
    在线教育项目-day02【搭建项目结构】
    在线教育项目-day01【wrapper条件构造器】
    在线教育项目-day01【性能分析插件】
  • 原文地址:https://www.cnblogs.com/hehe199316/p/14481340.html
Copyright © 2011-2022 走看看