zoukankan      html  css  js  c++  java
  • 线程池及线程相关知识及代码分析

    1:使用代码

    1.1:Callable使用

    可以用于在需要有返回值得时候使用。

    Callable<String> callable = new Callable<String>() {

              @Override
              public String call() throws Exception {
                  System.out.println("Running.......2222222222222222.." + Thread.currentThread().getId());

                  return "2222222222222222";
              }
          };
          FutureTask<String> stringFutureTask = new FutureTask<>(callable);
          new Thread(stringFutureTask).start();
          System.out.println(stringFutureTask.get());
    1.2:FutureTask使用

    除了上方的使用方式,还有如下的使用方式,该方式可以指定新线程执行完成之后的输出结果:

    Runnable runnable= new Runnable() {

      @Override
      public void run() {
        System.out.println("Running.......2222222222222222.." + Thread.currentThread().getId());
      }
    };
    String s = "121";
    FutureTask<String> stringFutureTask1 = new FutureTask<String>(runnable, s);
    new Thread(stringFutureTask1).start();
    1.3:线程池
    1.3.1 可缓存线程池

    Executors.newCacheThreadPool():可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务。

    当一个任务提交时,corePoolSize为0不创建核心线程,SynchronousQueue是一个不存储元素的队列,可以理解为队里永远是满的,因此最终会创建非核心线程来执行任务。

    对于非核心线程空闲60s时将被回收。因为Integer.MAX_VALUE非常大,可以认为是可以无限创建线程的,在资源有限的情况下容易引起OOM异常

    public static ExecutorService newCachedThreadPool() {
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
    }
    public void test() throws ExecutionException, InterruptedException {
      ExecutorService executorService = Executors.newCachedThreadPool();
      executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Running.......11111111111111.." + Thread.currentThread().getId());
            try {
              Thread.sleep(10000L);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
        }
      });
    }
    1.3.2 可重用线程池

    Executors.newFixedThreadPool(int n):创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程。

    public static ExecutorService newFixedThreadPool(int nThreads) {
      return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
    }
    public static void main(String[] args) {
          // 创建一个可重用固定个数的线程池
          ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
          for (int i = 0; i < 10; i++) {
              fixedThreadPool.execute(new Runnable() {
                  public void run() {
                      try {
                          // 打印正在执行的缓存线程信息
                          System.out.println(Thread.currentThread().getName()
                                  + "正在被执行");
                          Thread.sleep(2000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              });
          }
      }
    1.3.3 ScheduledThreadPool

    可以执行定时和周期性的任务

    public static void main(String[] args) {
                    //创建一个定长线程池,支持定时及周期性任务执行——延迟执行
                    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
                    //延迟1秒执行
                    /*scheduledThreadPool.schedule(new Runnable() {
                        public void run() {
                          System.out.println("延迟1秒执行");
                        }
                    }, 1, TimeUnit.SECONDS);*/      
                    //延迟1秒后每3秒执行一次
                    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                        public void run() {
                            System.out.println("延迟1秒后每3秒执行一次");
                        }
                  }, 1, 3, TimeUnit.SECONDS);              
      }
    1.3.4 单线程化的线程池

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    当一个任务提交时,首先会创建一个核心线程来执行任务,如果超过核心线程的数量,将会放入队列中,因为LinkedBlockingQueue是长度为Integer.MAX_VALUE的队列,可以认为是无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起OOM异常,同时因为无界队列,maximumPoolSize和keepAliveTime参数将无效,因爲没有创建非核心线程。

    public static ExecutorService newSingleThreadExecutor() {
      return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()));
    }
    public class Test {
      public static void main(String[] args) {
          //创建一个单线程化的线程池
          ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
          for (int i = 0; i < 10; i++) {
              final int index = i;
              singleThreadExecutor.execute(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          //结果依次输出,相当于顺序执行各个任务
                          System.out.println(Thread.currentThread().getName() + "正在被执行,打印的值是:" + index);
                          Thread.sleep(1000);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              });
          }
      }
    }

    2 源码分析

    2.1 Callable的使用代码

    2.1.1 Callable接口
    @FunctionalInterface
    public interface Callable<V> {
      /**
        * Computes a result, or throws an exception if unable to do so.
        */
      V call() throws Exception;
    }
    2.1.2 FutureTask类

    一个可取消的异步计算。FutureTask提供了对Future的基本实现,可以调用方法去开始和取消一个计算,可以查询计算是否完成并且获取计算结果。只有当计算完成时才能获取到计算结果,一旦计算完成,计算将不能被重启或者被取消,除非调用runAndReset方法。除了实现了Future接口以外,FutureTask还实现了Runnable接口,因此FutureTask交由Executor执行,也可以直接用线程调用执行(futureTask.run())。

    线程执行的状态值:state,state的取值去下:

    private volatile int state;
    private static final int NEW         = 0;   //表示一个新的任务,初始状态
    private static final int COMPLETING   = 1;   //当任务被设置结果时,处于此状态,这是一个中间状态
    private static final int NORMAL       = 2;   //表示任务正常结束。
    private static final int EXCEPTIONAL = 3;   //表示任务因异常而结束
    private static final int CANCELLED   = 4;   //任务还未执行之前调用cancel(true),处于CANCELLED
    private static final int INTERRUPTING = 5;   //当任务调用cancel(true)中断程序时,任务处于INTERRUPTING状态,这是一个中间状态。
    private static final int INTERRUPTED = 6;   //任务调用cancel(true)中断程序时会调用interrupt()方法中断线程运行,任务状态由INTERRUPTING转变为INTERRUPTED

    ![image](https://img2020.cnblogs.com/blog/1515709/202104/1515709-20210413141900856-740288771.jpg)

    2.1.3 执行流程
    2.1.3.1 第一步:FutureTask<String> stringFutureTask = new FutureTask<>(callable);

    其中构造函数中,会把对应的callable进行赋值,并初始化state为new,初始状态。

    public FutureTask(Callable<V> callable) {
      if (callable == null)
          throw new NullPointerException();
      this.callable = callable;
      this.state = NEW;       // ensure visibility of callable
    }
    2.1.3.2 第二步:new Thread(stringFutureTask).start();

    因为stringFutureTask是实现Runable接口,调用start方法会执行其中的run方法

    public void run() {
    //对线程状态进行校验
      if (state != NEW ||
          !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
          return;
      try {
          Callable<V> c = callable;
          if (c != null && state == NEW) {
              V result;
              boolean ran;
              try {
              //执行callable的call方法
                  result = c.call();
                  ran = true;
              } catch (Throwable ex) {
                  result = null;
                  ran = false;
                  setException(ex);
              }
              if (ran)
              //执行完成之后,将结果执行set,见下
                  set(result);
          }
      } finally {
          // runner must be non-null until state is settled to
          // prevent concurrent calls to run()
          runner = null;
          // state must be re-read after nulling runner to prevent
          // leaked interrupts
          int s = state;
          if (s >= INTERRUPTING)
              handlePossibleCancellationInterrupt(s);
      }
    }

    接上set方法:

    • 1:将执行结果赋值实例变量 private Object outcome;

    • 2:将线程的state状态值修改为NORMAL

    • 3:唤醒所有阻塞的线程

    protected void set(V v) {
      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = v;
          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
          finishCompletion();
      }
    }

    接上finishCompletion(),遍历唤醒所有阻塞线程,并callable置null

    private void finishCompletion() {
      // assert state > COMPLETING;
      for (WaitNode q; (q = waiters) != null;) {
          if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
              for (;;) {
                  Thread t = q.thread;
                  if (t != null) {
                      q.thread = null;
                      LockSupport.unpark(t);
                  }
                  WaitNode next = q.next;
                  if (next == null)
                      break;
                  q.next = null; // unlink to help gc
                  q = next;
              }
              break;
          }
      }
      done();
      callable = null;       // to reduce footprint
    }
    2.1.3.3 第三步:stringFutureTask.get()
    • 1:判断执行状态,如果s <= COMPLETING,进入awaitDone方法,其中会阻塞或者进行yield();

      如果是被阻塞,那么在第二步执行完会进行唤醒。

    • 2:否则report(s),根据状态返回执行结果。

    public V get() throws InterruptedException, ExecutionException {
      int s = state;
      if (s <= COMPLETING)
          s = awaitDone(false, 0L);
      return report(s);
    }

    接上awaitDone()方法:

    • 1:s > COMPLETING,返回退出循环;

    • 2:s == COMPLETING。进入可执行态,让出执行资源;

    • 3:创建WaitNode,并且加入到链表的头部;

    • 4:如果有超时机制,则LockSupport.parkNanos(this, nanos);,超时阻塞;

    • 5:进行阻塞。

    private int awaitDone(boolean timed, long nanos)
      throws InterruptedException {
      final long deadline = timed ? System.nanoTime() + nanos : 0L;
      WaitNode q = null;
      boolean queued = false;
      for (;;) {
          if (Thread.interrupted()) {
              removeWaiter(q);
              throw new InterruptedException();
          }

          int s = state;
          if (s > COMPLETING) {
              if (q != null)
                  q.thread = null;
              return s;
          }
          else if (s == COMPLETING) // cannot time out yet
              Thread.yield();
          else if (q == null)
              q = new WaitNode();
          else if (!queued)
              queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
          else if (timed) {
              nanos = deadline - System.nanoTime();
              if (nanos <= 0L) {
                  removeWaiter(q);
                  return state;
              }
              LockSupport.parkNanos(this, nanos);
          }
          else
              LockSupport.park(this);
      }
    }

    接上report()方法:

    • 1:获取运行结果,并将获取结果进行返回;

    • 2:如果s == NORMAL,输出运行结果;

    • 3:其他则抛出异常。

    private V report(int s) throws ExecutionException {
      Object x = outcome;
      if (s == NORMAL)
          return (V)x;
      if (s >= CANCELLED)
          throw new CancellationException();
      throw new ExecutionException((Throwable)x);
    }
    2.1.3.4 第四步:stringFutureTask.cancel(true);
    • 1:线程为NEW,并且修改线程状态成功进入第二步,否则直接返回false;

    • 2:如果mayInterruptIfRunning为false,则finishCompletion();

      如果该值为true,则线程进行中断,修改线程状态为INTERRUPTED。

    public boolean cancel(boolean mayInterruptIfRunning) {
      if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
          return false;
      try {   // in case call to interrupt throws exception
          if (mayInterruptIfRunning) {
              try {
                  Thread t = runner;
                  if (t != null)
                      t.interrupt();
              } finally { // final state
                  UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
              }
          }
      } finally {
          finishCompletion();
      }
      return true;
    }

    2.2 线程池的代码分析

    2.2.1 线程池参数
    /**
    * 线程工厂,创建一个线程使用的工厂,用于设置线程的线程名,是否为daemon线程。
    */
    private volatile ThreadFactory threadFactory;

    /**
    * Handler called when saturated or shutdown in execute.
    * 拒绝策略
    */
    private volatile RejectedExecutionHandler handler;

    /**
    * Timeout in nanoseconds for idle threads waiting for work.
    * Threads use this timeout when there are more than corePoolSize
    * present or if allowCoreThreadTimeOut. Otherwise they wait
    * forever for new work.
    * 对于线程池线程数量大于核心线程或者设置allowCoreThreadTimeOut有效,空闲线程存活的最大时长
    */
    private volatile long keepAliveTime;

    /**
    * Core pool size is the minimum number of workers to keep alive
    * (and not allow to time out etc) unless allowCoreThreadTimeOut
    * is set, in which case the minimum is zero.
    * 线程池维护的最小线程数量,不会被超时回收,除非设置参数allowCoreThreadTimeOut
    */
    private volatile int corePoolSize;

    /**
    * Maximum pool size. Note that the actual maximum is internally
    * bounded by CAPACITY.
    * 线程池的最大数量
    */
    private volatile int maximumPoolSize;
    2.2.2 丢弃策略
    • CallerRunsPolicy

      /**
      * Executes task r in the caller's thread, unless the executor
      * has been shut down, in which case the task is discarded.
      * 使用线程池的线程,运行被拒绝任务的run方法
      */
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
      }
    • AbortPolicy

      /**
      * Always throws RejectedExecutionException.
      * 丢弃任务,并且抛出异常
      */
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                              " rejected from " +
                                              e.toString());
      }
    • DiscardPolicy

      /**
      * Does nothing, which has the effect of discarding task r.
      * 直接丢弃,没有做任何事情
      */
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      }
    • DiscardOldestPolicy

      /**
      * Obtains and ignores the next task that the executor
      * would otherwise execute, if one is immediately available,
      * and then retries execution of task r, unless the executor
      * is shut down, in which case task r is instead discarded.
      * 丢弃进入队列最早的任务,并将被拒绝的任务加入到队列中
      */
      public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
      }
    2.2.3 ThreadPoolExecutor 线程参数
    /**
        * The main pool control state, ctl, is an atomic integer packing
        * two conceptual fields
        * ctl表示线程池的状态
        *   workerCount, indicating the effective number of threads
        *               当前有效的线程数
        *   runState,   indicating whether running, shutting down etc
        *               当前线程的运行状态
    **/    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 ,线程数量所占位数
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; ///低29位表示最大线程数,2^29-1

    // runState is stored in the high-order bits
    private static final int RUNNING   = -1 << COUNT_BITS;   //高3位111
    private static final int SHUTDOWN   = 0 << COUNT_BITS;   ///高3位000
    private static final int STOP       = 1 << COUNT_BITS;   //高3位001
    private static final int TIDYING   = 2 << COUNT_BITS;   ////高3位010
    private static final int TERMINATED = 3 << COUNT_BITS;   //高3位011

    // 获取运行状态,低二十九位变成0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取线程数量,高三为变成0
    private static int workerCountOf(int c) { return c & CAPACITY; }
    //获取线程池状态,运行状态+线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    • 1、RUNNING

    (1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 (02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    • 2、 SHUTDOWN

    (1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。 (2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

    • 3、STOP

    (1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。 (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

    • 4、TIDYING

    (1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。 (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

    • 5、 TERMINATED

    (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。 (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

     
    2.2.4 execute(Runnable command)
    /*
            * Proceed in 3 steps:
            *
            * 1. If fewer than corePoolSize threads are running, try to
            * start a new thread with the given command as its first
            * task. The call to addWorker atomically checks runState and
            * workerCount, and so prevents false alarms that would add
            * threads when it shouldn't, by returning false.
            * 如果当前线程数量小于corePoolSize,开启一个新的线程,并将当前任务作为新线程的第一个任务,调用的时         * 候会自动检测运行状态和workerCount,当没有办法添加新的线程的时候,会返回false。
            *
            * 2. If a task can be successfully queued, then we still need
            * to double-check whether we should have added a thread
            * (because existing ones died since last checking) or that
            * the pool shut down since entry into this method. So we
            * recheck state and if necessary roll back the enqueuing if
            * stopped, or start a new thread if there are none.
            * 当前线程池的全部线程数量大于核心线程数量,则将线程放入到任务队列中
            *
            * 3. If we cannot queue task, then we try to add a new
            * thread. If it fails, we know we are shut down or saturated
            * and so reject the task.
            * 如果任务不能进入队列,尝试一次添加线程,如果失败即线程池拒绝接收,抛出异常。
    */
    public void execute(Runnable command) {
      if (command == null)
          throw new NullPointerException();
      int c = ctl.get();
        /*1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中*/
      if (workerCountOf(c) < corePoolSize) {
          if (addWorker(command, true))
              return;
          c = ctl.get();
      }
      /*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/
      if (isRunning(c) && workQueue.offer(command)) {
          int recheck = ctl.get();
          //检测线程不处于运行状态,则删除任务,并执行拒绝策略
          if (! isRunning(recheck) && remove(command))
              reject(command);
          else if (workerCountOf(recheck) == 0)
              addWorker(null, false);
      }
      /*3.插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常*/
      else if (!addWorker(command, false))
          reject(command);
    }
    • 1:addWorker的代码

      private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            //检测运行状态,如果队列只有队列为空则继续
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                return false;
      //自旋循环
            for (;;) {
                int wc = workerCountOf(c);
                //检测运行线程数合法性
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //在运行线程数量赋值成功之后,跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); // Re-read ctl
                //如果运行状态有更新继续进行外层的自旋循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        //新增一个worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加锁,保证安全
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
      //将新建的work添加到works中
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //新建的work新增成功,进行运行
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
      }

      接上其中:w = new Worker(firstTask);

      /**
      * Creates with given first task and thread from ThreadFactory.
      */
      Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        //将当前task添加到类的实例变量
        this.firstTask = firstTask;
        //根据设置的线程工厂,创建一个线程,线程为this
        this.thread = getThreadFactory().newThread(this);
      }
    • 2:runWorker的代码

      为上面t.start()中,执行新的线程,会调用work中的run方法。

      /** Delegates main run loop to outer runWorker  */
      public void run() {
        runWorker(this);
      }
    final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
      //如果task为null就通过getTask方法获取阻塞队列中的下一个任务
          //getTask方法一般不会返回null,所以这个while类似于一个无限循环
          //worker对象就通过这个方法的持续运行来不断处理新的任务
          while (task != null || (task = getTask()) != null) {
              w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted. This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
              //如果线程池将要结束,需要保证线程interrupted,如果不是,则要保证线程不是interrupted
              if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  wt.interrupt();
              try {
              //执行前置逻辑,便于扩展
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                  //运行线程逻辑
                      task.run();
                  } catch (RuntimeException x) {
                      thrown = x; throw x;
                  } catch (Error x) {
                      thrown = x; throw x;
                  } catch (Throwable x) {
                      thrown = x; throw new Error(x);
                  } finally {
                  //执行后置逻辑
                      afterExecute(task, thrown);
                  }
              } finally {
                  task = null;
                  w.completedTasks++;
                  w.unlock();
              }
          }
          completedAbruptly = false;
      } finally {
          processWorkerExit(w, completedAbruptly);
      }
    }
    • 3:getTask()代码

      private Runnable getTask() {
        // 通过timeOut变量表示线程是否空闲时间超时了
        boolean timedOut = false; // Did the last poll() time out?
      //循环
        for (;;) {
            //获取线程状态参数
            int c = ctl.get();
            //获取线程池运行状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
      //获取线程池线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?  
            // 判断当前线程是否会被超时销毁
            // 会被超时销毁的情况:线程池允许核心线程超时 或 当前线程数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      // 如果 (当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
            //   且 (当前线程数大于1 或 阻塞队列为空) —— 该条件在阻塞队列不为空的情况下保证至少会保留一 个线程继续处理任务
            // 则 减少worker计数并返回null(返回null会导致当前worker被销毁)
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 从阻塞队列中取出一个任务(如果队列为空会进入阻塞等待状态)
                // 如果允许空闲超时销毁线程的话则带有一个等待的超时时间,如果超时没有获取会返回null,则 当前worker会结束
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 如果任务为null,则说明发生了等待超时,将空闲时间超时标志设置为true,在下一次循环的时 候会较少线程池数量,并返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
      }

    基于代码分析,细化几种线程池的流程:

    • 1:当前线程池的线程小于核心线程池数量的时候,会创建新的worker,并运行;

      worker除了执行第一个任务,还会阻塞获取workQueue中的任务,根据是否需要超时丢弃线程进行判断获取

    • 2:当大于核心线程池数量的时候,会加入到阻塞队列中,由(1)中的创建线程监听队列的线程进行获取,执行;

    • 3:如果大于核心线程数量,则会继续创建线程,

    • 4:如果3中创建失败或者已大于最大值,则根据丢弃策略执行丢弃方法。

    2.3 延时及周期线程池的代码分析

    延时及周期性线程池执行逻辑于上述代码逻辑不一致,摘取出单独分析。

     ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
      scheduledExecutorService.schedule(new Runnable() {
          @Override
          public void run() {
              System.out.println("I'm running...");
          }
      },10L,TimeUnit.SECONDS);

      scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
          @Override
          public void run() {
              System.out.println("I'm running...");
          }
      },10L,10L,TimeUnit.SECONDS);
    }
    2.3.1 线程池的创建

    与上述线程池一致,但是队列使用的是延迟队列。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
      return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
      super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
    }
    2.3.2 执行流程分析
    2.3.2.1 schedule,延时任务
    public ScheduledFuture<?> schedule(Runnable command,
                                      long delay,
                                      TimeUnit unit) {
      if (command == null || unit == null)
          throw new NullPointerException();
          //装饰一个RunnableScheduledFuture
      RunnableScheduledFuture<?> t = decorateTask(command,
      //构造一个ScheduledFutureTask,返回为Void
          new ScheduledFutureTask<Void>(command, null,
                                        triggerTime(delay, unit)));
      delayedExecute(t);
      return t;
    }
    • 1:new ScheduledFutureTask

      ScheduledFutureTask:extends FutureTask<V> implements RunnableScheduledFuture<V>

      其继承与FutureTask,对实例变量进行赋值,将Runable包装成callable。

      /**
      * Creates a one-shot action with given nanoTime-based trigger time.
      */
      ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
      }
    public FutureTask(Runnable runnable, V result) {
      this.callable = Executors.callable(runnable, result);
      this.state = NEW;       // ensure visibility of callable
    }
    • 2:执行的具体方法

      执行延时任务和周期任务的主要方法,如果线程池已经关闭,则拒绝接受任务。否则将任务加入到队列之中。之后ensurePrestart,开始addWork,并且添加null任务,Woker中会一直监听延迟队列中的元素,当符合时间要求就会弹出执行,但最终执行的是ScheduledFutureTask#run方法。

      Main execution method for delayed or periodic tasks.  If pool
      is shut down, rejects the task. Otherwise adds task to queue
      and starts a thread, if necessary, to run it. (We cannot
      prestart the thread to run the task because the task (probably)
      shouldn't be run yet.) If the pool is shut down while the task
      is being added, cancel and remove it if required by state and
      run-after-shutdown parameters.
      private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
      }
    • 3:ScheduledFutureTask#run方法

      其中经过校验,对于延时任务进入ScheduledFutureTask.super.run();即FuntureTask的run方法,RunnableAdapter的call方法,最终会执行用户定义的Runder方法。

       /**
          * Overrides FutureTask version so as to reset/requeue if periodic.
          */
        public void run() {
            boolean periodic = isPeriodic();
            //校验
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            //非周期性的
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //周期性的    
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }
      }
    • 4:ensurePrestart();

      新的任务到来,如果还可以开辟新的核心线程,则开始新的核心线程。

      void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
      }
    2.3.2.2 scheduleAtFixedRate,延时周期任务
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                long initialDelay,
                                                long period,
                                                TimeUnit unit) {
      if (command == null || unit == null)
          throw new NullPointerException();
      if (period <= 0)
          throw new IllegalArgumentException();
          //比2中多了周期的参数
      ScheduledFutureTask<Void> sft =
          new ScheduledFutureTask<Void>(command,
                                        null,
                                        triggerTime(initialDelay, unit),
                                        unit.toNanos(period));
      RunnableScheduledFuture<Void> t = decorateTask(command, sft);
      //将当前等待执行的任务赋值为outerTask
      sft.outerTask = t;
      //进行任务执行,同2
      delayedExecute(t);
      return t;
    }

    最终会执行上面的第三条:

    /**
        * Overrides FutureTask version so as to reset/requeue if periodic.
        */
      public void run() {
          boolean periodic = isPeriodic();
          //校验
          if (!canRunInCurrentRunState(periodic))
              cancel(false);
          //非周期性的
          else if (!periodic)
              ScheduledFutureTask.super.run();
          //周期性的    
          else if (ScheduledFutureTask.super.runAndReset()) {
              setNextRunTime();
              reExecutePeriodic(outerTask);
          }
      }
    }
    • 1:runAndReset()

      Executes the computation without setting its result, and then resets this future to initial state。

      运行线程,并且不返回结果,Funture的状态依然是NEW。

      protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                          null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
      }
    • 2:setNextRunTime();

      设置周期任务的下次执行时间

      /**
      * Sets the next time to run for a periodic task.
      */
      private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
      }
    • 3:reExecutePeriodic

      重新将任务加入队列,并尝试开启新的线程

      void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
        //将任务加入队列之中
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
            //尝试开启新的线程,如果线程数量没有达到corePoolSize
                ensurePrestart();
        }
      }
  • 相关阅读:
    审核被拒:包含隐藏功能
    iOS好的个人博客和平台网站
    免费的Git和SVN服务器
    组件化
    三方生产利器
    RSA加密解密和签名验证机制以及其区别和联系
    APP和后台接口设计规范
    树和二叉树2——输出广义表形式(带括号)二叉树
    树和二叉树1——链式二叉树基础
    计算机图形学5——Two-Dimensional Viewing and Clipping(二维线段裁剪算法)
  • 原文地址:https://www.cnblogs.com/mayang2465/p/14652984.html
Copyright © 2011-2022 走看看