zoukankan      html  css  js  c++  java
  • JAVA-ThreadPoolExecutor 线程池

    一、创建线程池

    /**
     * @param corePoolSize 核心线程池大小
     * 当提交一个任务到线程池时,如果当前 poolSize < corePoolSize 时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程
     * 等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有基本线程。
     *
     * @param maximumPoolSize 最大线程池大小
     * 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
     * 值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
     *
     * @param keepAliveTime 线程活动保持时间
     * 线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
     *
     * @param unit 线程活动保持时间的单位
     * TimeUnit.DAYS:天
     * TimeUnit.HOURS:小时
     * TimeUnit.MINUTES:分钟
     * TimeUnit.MILLISECONDS:毫秒
     * TimeUnit.MICROSECONDS:微秒,千分之一毫秒
     * TimeUnit.NANOSECONDS:纳秒,千分之一微秒
     *
     * @param workQueue 保存等待执行的任务的阻塞队列
     * ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,按FIFO(先进先出)进行排序
     * LinkedBlockingQueue:一个基于链表结构的阻塞队列,按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue。
     * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue。
     * PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
     *
     * @param threadFactory 创建线程的工厂
     * 使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字
     * ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
     *
     * @param handler maxmumPoolSize + workQueue 都满了之后处理新提交任务的策略
     * AbortPolicy:直接抛出异常(默认)。
     * CallerRunsPolicy:只用调用者所在线程来运行任务。
     * DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
     * DiscardPolicy:不处理,丢弃掉。
     * 也可实现 RejectedExecutionHandler 接口自定义策略,如记录日志或持久化存储不能处理的任务。
     */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    二、使用线程池

    2.1.提交无返回值任务 execute(),输入的任务是一个 Runnable 类的实例,无法判断任务是否被线程池执行成功

    // 线程工厂,这里主要用来设置线程名字
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    // 创建线程池
    ThreadPoolExecutor singleThreadPool = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024),
            namedThreadFactory,
            new ThreadPoolExecutor.AbortPolicy());
    // 向线程池提交任务
    singleThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()));

    2.2.提交有返回值任务 submit(),输入的任务是一个 Callable 或 Runnable 类的实例,有返回值,且可抛出异常,可中断线程

    // 线程工厂,这里主要用来设置线程名字
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    // 创建线程池
    ThreadPoolExecutor singleThreadPool = new ThreadPoolExecutor(
            1,
            1,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024),
            namedThreadFactory,
            new ThreadPoolExecutor.AbortPolicy());
    // 添加任务
    Future<String> future = singleThreadPool.submit(new Callable<String>() {
        @Override
        public String call() {
            return Thread.currentThread().getName();
        }
    });
    
    // 获取结果,会阻塞当前线程,这里调用了有参方法,指定了阻塞时间,若在时间内未执行完则获取结果会报错
    System.out.println("结果:" + future.get(1, TimeUnit.SECONDS));
    System.out.println("是否执行完成:" + future.isDone());

    2.3.关闭线程池 shutdown() 与 shutdownNow()

    // 调用后,不可以再submit新的task,已经submit的将继续执行。会遍历已经在线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程
    singleThreadPool.shutdown();
    // 首先将线程池的状态设置成STOP,然后试图停止当前正执行或暂停的 task 的线程,并返回尚未(等待)执行的 task 的 list
    List<Runnable> runnables = singleThreadPool.shutdownNow();

    三、线程池处理流程

    对应到代码中 ThreadPoolExecutor 的 execute() 方法

    四、ThreadPoolExecutor 源码

     execute 执行任务方法

    public void execute(Runnable command) {
        // 如果提交了空的任务则抛出异常
        if (command == null)
            throw new NullPointerException();
    
        // 分三步
    
        int c = ctl.get();
        // 1.当前工作线程数量是否小于核心线程数量
        if (workerCountOf(c) < corePoolSize) {
            //启动新线程(核心),对 addWorker 的调用以原子方式检查 runState 和 workerCount
            if (addWorker(command, true))
                return;
            // 如果提交失败 则二次检查状态
            c = ctl.get();
        }
    
        // 2.如果线程池处于运行状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 添加到队列成功,再检查一次线程池的状态,如果线程池关闭了,就将刚才添加的任务从队列中移除,并执行拒绝策略
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池线程空,则添加一个新线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    
        // 3.尝试添加一个新线程(非核心),新增失败则已关闭或饱和,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

    addWorker 添加任务方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 大致分为两部分
        
        // 1.增加线程池个数
        retry:
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 检查当前线程池状态是否是 SHUTDOWN、STOP、TIDYING 或者 TERMINATED
            // 且!(当前状态为SHUTDOWN,且传入的任务为null,且队列不为null)
            // 条件都成立则返回 false
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;
    
            for (; ; ) {
                int wc = workerCountOf(c);
                // 如果当前的线程数量超过最大容量或者大于(根据传入的 core 决定)核心线程数 || 最大线程数,则返回 false
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 尝试修改线程数,cas 操作
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                // 判断线程池的状态是否改变
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        
        // 2.将任务添加到 workers 里面并执行
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 判断线程池的状态
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        // 检查线程是否已经启动
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将线程添加到线程池中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 设置新增标志
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果 worker 是新增的,就启动该线程
                if (workerAdded) {
                    t.start();
                    // 成功启动了线程,设置对应的标志
                    workerStarted = true;
                }
            }
        } finally {
            // 判断线程是否启动成功
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    t.start() 实际调用的是 runWorker()

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许其他线程来中断自己
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 循环获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 检查线程池状态
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 提供给继承类使用做一些统计之类的事情,在线程运行前调用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 提供给继承类使用做一些统计之类的事情,在线程运行之后调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 统计当前线程完成了多少个任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作
            processWorkerExit(w, completedAbruptly);
        }
    }

    getTask() 获取任务

    private Runnable getTask() {
        // 最后一次 poll() 是否超时
        boolean timedOut = false;
    
        for (; ; ) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 仅在必要时检查队列是否为空,如果线程池已经关闭了,就直接返回 null
            // SHUTDOWN 状态表示执行了 shutdown() 方法,STOP 表示执行了 shutdownNow() 方法
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            // 线程数量
            int wc = workerCountOf(c);
    
            // 核心 worker 是否超时,当前正在运行的 worker 数量超过了 corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果上一次循环从队列获取到的为 null,这时 timedOut 就会为 true 了
            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                // 通过 cas 来设置 WorkerCount,多个线程竞争,只有一个可以设置成功
                // 没设置成功,进入下一次循环,可能下次 worker 的数量就没有超过 corePoolSize,也就不用销毁 worker
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 获取任务,超过 keepAliveTime 时间还没有任务进队列就会返回 null,worker 会销毁
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    https://www.jianshu.com/p/098819be088c

    https://blog.csdn.net/u013332124/article/details/79587436

    http://www.cnblogs.com/fixzd/p/9253203.html

  • 相关阅读:
    Windows7平台下gitblit服务器安装
    使用JDK的zip编写打包工具类
    MongoDB和Java(7):MongoDB用户管理
    MongoDB和Java(6):Spring Data整合MongoDB副本集、分片集群
    MongoDB和Java(5):Spring Data整合MongoDB(注解配置)
    MongoDB和Java(4):Spring Data整合MongoDB(XML配置)
    MongoDB和Java(3):Java操作MongoB
    MongoDB和Java(2):普通用户启动mongod进程
    Spring MVC 执行原理
    选择性配置-ConditionalOnProperty
  • 原文地址:https://www.cnblogs.com/jhxxb/p/10833160.html
Copyright © 2011-2022 走看看