zoukankan      html  css  js  c++  java
  • java多线程源码解析

    一.interface Executor 最顶层接口

    1.1

    void execute(Runnable command);

    二.class Executors 为这些 Executor 提供了便捷的工厂方法。

    1.newFixedThreadPool  固定个数的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
        // 1.1 调用ThreadPoolExecutor
        return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());//1.2 使用linkedBlockingQueue
    }

    1.1 调用ThreadPoolExecutor()构造方法

    //1.1.1 参数说明
    public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue) {
    //1.1.2 调用另一个构造方法
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }

    1.1.1 参数说明*

    corePoolSize 核心线程数

    maximumPoolSize 最大线程数

    keepAliveTime 线程存活时间

    unit

    BlockingQueue 队列

    1.1.2 调用基础的构造方法

    public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    没什么好说的,就是一些参数校验,异常处理。

    1.2 linkedBlockingQueue

    看名字就知道,这是一个阻塞队列。 阻塞队列我会另写文章,此处仅继续往下走 不向上挖。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);// 1.2.1调用LinkedBlockingQueue(int) 构造方法
    }

    1.2.1调用LinkedBlockingQueue(int) 构造方法

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    很明显,构造了一个最大整型2的32次方-1的队列

    *对于这些参数的含义,我想应该在execute方法中去一探究竟。

    先说结论

      1.如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
      2.如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
      3.如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;(此时队列里面的任务还在队列里面,也就是跨过了队列里面的任务,创建了新的线程运行当前任务)
      4.如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();// 1.1
        if (workerCountOf(c) < corePoolSize) {// 当运行线程数小于corePoolSize
            if (addWorker(command, true)) // 1.2 addWorker 创建成功则返回,失败则继续
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 1.3 offer() 是向队列添加一个新元素,当队列满返回false,区别于add()也是添加一个新元素,队列满抛出unchecked异常
            int recheck = ctl.get();// 二次检查
            if (! isRunning(recheck) && remove(command))// 1.4recheck>0, remove 删除任务,返回boolean
                reject(command); //拒绝该任务
            else if (workerCountOf(recheck) == 0) //1.5 recheck=0时,
                addWorker(null, false);
        }
        else if (!addWorker(command, false))// 添加失败,拒绝
            reject(command);
    }

    1.1 ct1 为-2的29次方, -536870912

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int COUNT_BITS = Integer.SIZE - 3;
    public static final int SIZE = 32;

    1.2

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:// label标签,可以在循环嵌套时使用,可以在内层循环直接跳出外层循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty())) // 检查队列是否为空
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))// 当前运行线程大于等于容量,或者根据core,判断大于等于核心线程数or最大线程数
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    1.3 ctl.get()<0 且队列能成功添加一个新任务

    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    1.4 ctl.get>0 且队列能删除改任务 则拒绝任务

    1.5  ctl ==0 , addWorker(null, false)

    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    private static int workerCountOf(int c)  { return c & CAPACITY; }

     2.newSingleThreadExecutor  corePoolSize 和maximumPoolSize都为1 的线程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

     3.newCachedThreadPool  corePoolSize 为0,maximumPoolSize都为最大整型数的线程池,即都放在队列中

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());//注意此处为SynchronousQueue,看源码其实是创建了TransferStack()
    }

    4.newScheduledThreadPool   

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                new DelayedWorkQueue());//注意此处为DelayedWorkQueue,初始容量为16的阻塞延迟队列?
    }

    三.class ThreadPoolExecutor extends AbstractExecutorService 提供一个可扩展的线程池实现

    四.interface ExecutorService extends Executor  更广泛的接口

    五.abstract class AbstractExecutorService extends ExecutorService

  • 相关阅读:
    Java并发(十八):阻塞队列BlockingQueue
    web前端
    python学习总结:目录
    Django -- 5.路由层(URLconf)_基于Django1
    python:linux下字符串转换为JSON
    python:一秒中启动一个下载服务器
    Flask【第十二章】:Flask之Websocket,建立单聊群聊
    Flask【第十一章】:Flask中的CBV以及偏函数+线程安全
    Flask【第十章】:特殊装饰器 @app.before_request 和 @app.after_request 以及@app.errorhandler
    Flask【第九章】:Flask之蓝图
  • 原文地址:https://www.cnblogs.com/inspirationBoom/p/10750023.html
Copyright © 2011-2022 走看看