zoukankan      html  css  js  c++  java
  • 并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)

    前两篇文章讲了线程池的源码分析,再来看这篇文章就比较简单了, 本文主要讲解 Executors 这个工具类,看看长江创建线程池的几种方法。

    newFixedThreadPool

    • 生成一个固定大小的线程池:
    1 public static ExecutorService newFixedThreadPool(int nThreads) {
    2     return new ThreadPoolExecutor(nThreads, nThreads,
    3                                   0L, TimeUnit.MILLISECONDS,
    4                                   new LinkedBlockingQueue<Runnable>());
    5 }

    最大线程数设置为与核心线程数相等,则不会创建临时线程,创建的线程都是核心线程,线程也不会被回收。此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列,所以FixedThreadPool永远不会拒绝, 即饱和策略失效。

    过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

    newSingleThreadExecutor

    • 生成只有一个线程的固定线程池
    1 public static ExecutorService newSingleThreadExecutor() {
    2     return new FinalizableDelegatedExecutorService
    3         (new ThreadPoolExecutor(1, 1,
    4                                 0L, TimeUnit.MILLISECONDS,
    5                                 new LinkedBlockingQueue<Runnable>()));
    6 }

    这个更简单,和上面的一样,只要设置线程数为 1 就可以了。

    初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。

    由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效。

    由于newFixedThreadPool和SingleThreadPool都是使用的LinkedBlockingQueue,并且核心线程固定,如果此时并发有大量任务进行添加,线程处理速度过慢,将会全部添加到LinkedBlockingQueue中,此时会出现内存溢出。

    newCachedThreadPool

    • 生成一个需要的时候就创建新的线程
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue,所以创建的线程都是临时线程,都可以被回收。

    这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。

     1 int c = ctl.get();
     2 // corePoolSize 为 0,所以不会进到这个 if 分支
     3 if (workerCountOf(c) < corePoolSize) {
     4     if (addWorker(command, true))
     5         return;
     6     c = ctl.get();
     7 }
     8 // offer 如果有空闲线程刚好可以接收此任务,那么返回 true,否则返回 false
     9 if (isRunning(c) && workQueue.offer(command)) {
    10     int recheck = ctl.get();
    11     if (! isRunning(recheck) && remove(command))
    12         reject(command);
    13     else if (workerCountOf(recheck) == 0)
    14         addWorker(null, false);
    15 }
    16 else if (!addWorker(command, false))
    17     reject(command);

    过程分析:我把 execute 方法的主体粘贴过来,让大家看得明白些。鉴于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 SynchronousQueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker,第一个worker执行完后就getTask()从队列中取任务。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中把当前任务给正在等待的worker,如果没有空闲的线程在等待取任务,就是和第一个任务一样,进到最后的 else if 分支创建worker。

    我们来仔细分析下代码,第一次添加任务时,执行到第9行 workQueue.offer(command),我把以前文章里面的offer()代码贴过来,如果有感兴趣的可以去看看《并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析

    1 public boolean offer(E e) {
    2     if (e == null) throw new NullPointerException();
    3     return transferer.transfer(e, true, 0) != null;
    4 }
     1 /**
     2  * Puts or takes an item.
     3  */
     4 Object transfer(Object e, boolean timed, long nanos) {
     5 
     6     QNode s = null; // constructed/reused as needed
     7     boolean isData = (e != null);
     8 
     9     for (;;) {
    10         QNode t = tail;
    11         QNode h = head;
    12         if (t == null || h == null)         // saw uninitialized value
    13             //说明还没有初始化,则跳出继续循环,直至初始化完成
    14             continue;                       // spin
    15 
    16         // 走到这里,说明已经初始化完成,但是初始化时head = h;tail = h;head和tail都是相同的空节点
    17         // 如果h == t为false,则判断t.isData == isData,判断队尾节点和当前节点类型是否一致
    18         // 队列空,或队列中节点类型和当前节点一致,
    19         // 即我们说的第一种情况,将节点入队即可。读者要想着这块 if 里面方法其实就是入队
    20         if (h == t || t.isData == isData) { // empty or same-mode
    21             QNode tn = t.next;
    22             // t != tail 说明刚刚有节点入队,continue 即可
    23             if (t != tail)                  // inconsistent read
    24                 continue;
    25             // 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可
    26             if (tn != null) {               // lagging tail
    27                 // 这个方法就是:如果 tail 此时为 t 的话,设置为 tn
    28                 advanceTail(t, tn);
    29                 continue;
    30             }
    31             // 
    32             if (timed && nanos <= 0)        // can't wait
    33                 return null;
    34             // s == null,则创建一个新节点
    35             if (s == null)
    36                 s = new QNode(e, isData);
    37             // 将当前节点,插入到 tail 的后面
    38             if (!t.casNext(null, s))        // failed to link in
    39                 continue;
    40 
    41             // 将当前节点设置为新的 tail
    42             advanceTail(t, s);              // swing tail and wait
    43             // 看到这里,请读者先往下滑到这个方法,看完了以后再回来这里,思路也就不会断了
    44             Object x = awaitFulfill(s, e, timed, nanos);
    45             // 到这里,说明之前入队的线程被唤醒了,准备往下执行
    46             // 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点
    47             if (x == s) {                   // wait was cancelled
    48                 clean(t, s);
    49                 return null;
    50             }
    51             // 若s节点被设置为取消
    52             if (!s.isOffList()) {           // not already unlinked
    53                 advanceHead(t, s);          // unlink if head
    54                 if (x != null)              // and forget fields
    55                     s.item = s;
    56                 s.waiter = null;
    57             }
    58             return (x != null) ? x : e;
    59 
    60         // 这里的 else 分支就是上面说的第二种情况,有相应的读或写相匹配的情况
    61         } else {                            // complementary-mode
    62             QNode m = h.next;               // node to fulfill
    63             // 不一致读,表明有其他线程修改了队列
    64             if (t != tail || m == null || h != head)
    65                 continue;                   // inconsistent read
    66 
    67             Object x = m.item;
    68             if (isData == (x != null) ||    // m already fulfilled
    69                 x == m ||                   // m cancelled
    70                 !m.casItem(x, e)) {         // lost CAS
    71                 advanceHead(h, m);          // dequeue and retry
    72                 continue;
    73             }
    74 
    75             advanceHead(h, m);              // successfully fulfilled
    76             LockSupport.unpark(m.waiter);
    77             return (x != null) ? x : e;
    78         }
    79     }
    80 }
    81 
    82 void advanceTail(QNode t, QNode nt) {
    83     if (tail == t)
    84         UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
    85 }

    第一次offer(command)时,我们可以看到 transfer 方法中 第32行处 timed && nanos <= 0 成立,此时return null,则offer返回false,所以第一次添加任务时,就会执行最后的 else if (!addWorker(command, false)) 添加一个worker,如果这个worker执行完任务,在getTask()中从等待队列中取任务,这时如果有线程提交任务,则在 if (isRunning(c) && workQueue.offer(command)) 处给到空闲的线程;如果等待超过60秒,则关闭此线程;如果此时线程还在执行任务,还有线程提交任务,则还会执行到最后的 else if (!addWorker(command, false)) 添加一个worker。

    SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,可以看我的另一篇关于 BlockingQueue 的文章。

    第一次offer(command)时,如果此时没有相反操作的在getTask,这时添加队列并不会阻塞,直接返回false,然后创建一个worker,执行当前任务,当前worker在60秒内如果有其他线程offer,则会继续getTask执行任务,如果超时60秒,则会回收当前worker,如果并发很多同时提交任务,并且处理任务过慢,则会同时创建很多线程,因为没有空闲的线程等待getTask。如果在60秒内执行完任务,且又有任务来 ,则入队的线程直接将任务给空闲的线程

  • 相关阅读:
    1111实验二 作业调度模拟实验
    1006实验一实验报告
    0909对操作系统的认识
    南阳OJ-138 找球号(二)(hash表应用)
    南阳OJ-38 布线问题(最小生成树应用_prim)
    插入排序
    南阳OJ-756 重建二叉树(二叉树的中序遍历和后序遍历求先序遍历)
    南阳OJ-63 小猴子下落(数据结构-二叉树)
    UVA OJ-11095 Maximum Product(暴力求解法)
    UVA OJ-725 Division (暴力求解法)
  • 原文地址:https://www.cnblogs.com/java-chen-hao/p/10265938.html
Copyright © 2011-2022 走看看