zoukankan      html  css  js  c++  java
  • JAVA并发-Executor

    结构

    类继承图:

    clipboard.png

    上面的各个接口/类的关系和作用:

    1. Executor
      执行器接口,也是最顶层的抽象核心接口, 分离了任务和任务的执行。
    2. ExecutorService
      在Executor的基础上提供了执行器生命周期管理,任务异步执行等功能。
    3. ScheduledExecutorService
      在ExecutorService基础上提供了任务的延迟执行/周期执行的功能。
    4. Executors
      生产具体的执行器的静态工厂
    5. ThreadFactory
      线程工厂,用于创建单个线程,减少手工创建线程的繁琐工作,同时能够复用工厂的特性。
    6. AbstractExecutorService
      ExecutorService的抽象实现,为各类执行器类的实现提供基础。
    7. ThreadPoolExecutor
      线程池Executor,也是最常用的Executor,可以以线程池的方式管理线程。
    8. ScheduledThreadPoolExecutor
      在ThreadPoolExecutor基础上,增加了对周期任务调度的支持。
    9. ForkJoinPool
      Fork/Join线程池,在JDK1.7时引入,时实现Fork/Join框架的核心类。

    ThreadPoolExecutor

    线程工厂

    ThreadPoolExecutor在构造时如果用户不指定ThreadFactory,则默认使用Executors.defaultThreadFactory()创建一个ThreadFactory,即Executors.DefaultThreadFactory:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    
    /**
     * 默认的线程工厂.
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
     
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }
     
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    

    线程池中任务线程Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 这个是真正的线程,任务靠你啦
        final Thread thread;
        // 前面说了,这里的 Runnable 是任务。为什么叫 firstTask?因为在创建线程的时候,如果同时指定了
        // 这个线程起来以后需要执行的第一个任务,那么第一个任务就是存放在这里的(线程可不止执行这一个任务)
        // 当然了,也可以为 null,这样线程起来了,自己到任务队列(BlockingQueue)中取任务(getTask 方法)就行了
        Runnable firstTask;
        // 用于存放此线程完成的任务数,注意了,这里用了 volatile,保证可见性
        volatile long completedTasks;
        // Worker 只有这一个构造方法,传入 firstTask,也可以传 null
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 调用 ThreadFactory 来创建一个新的线程
            this.thread = getThreadFactory().newThread(this);
        }
        // 这里调用了外部类的 runWorker 方法
        public void run() {
            runWorker(this);
        }
        ...// 其他几个方法没什么好看的,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
    }
    

    特别要注意这里的初始化,将this 任务分配给了线程thread ,后面调用线程start,即调用Worker中的run`方法。

    注意点

    看源码要注意两个对象

    一个是任务队列BlockingQueue<Runnable> workQueue

    一个是任务线程集合HashSet<Worker> workers

    线程池进度流程

    • 工作线程数小于等于corePoolSize,这里将任务firstTask包装成worker,并新增到worker

    • 工作线程数大于corePoolSize且可以入队,这里将任务command入队到workQueue

      • 这里有一个特殊点addWorker(null, false),这里是为了防止任务入队了,但是工作线程为0,这里就是为了新增一个工作线程,即再新增一个firstTasknullworkerworkers中。
    • workQueue队列已满,小于maximumPoolSize,再次将任务firstTask包装成worker,并新增到workers中。

    上述的firstTask包装成worker,当worker中的线程启动时,会自旋不停的处理任务,第一个处理的就是worker中的firstTask,如果firstTask处理结束或firstTasknull,那么继续调用getTask方法从队列workQueue中获取任务处理。

    参考:

    Java多线程进阶(三九)—— J.U.C之executors框架:executors框架概述

    深度解读 java 线程池设计思想及源码实现

  • 相关阅读:
    Libvirt错误总结
    linux学习
    HMC 命令行登陆设置
    AIX扩VG和扩文件系统
    Bootstrap CSS2
    Bootstrap CSS
    JQuery的实例集合
    JQuery的noConflict()方法
    android的intent实现页面的跳转
    android的activity
  • 原文地址:https://www.cnblogs.com/hongdada/p/11983483.html
Copyright © 2011-2022 走看看