zoukankan      html  css  js  c++  java
  • Executor

     Executor是一个简单的标准化接口,用于定义自定义线程类子系统,包括线程池,异步I / O和轻量级任务框架。根据正在使用的具体Executor类,任务可以在新创建的线程,现有任务执行线程或线程调用中execute执行,并且可以顺序执行或同时执行。 ExecutorService提供了更完整的异步任务执行框架。ExecutorService管理任务的排队和调度,并允许受控关闭。该ScheduledExecutorService 子接口及相关的接口添加了延迟的和定期任务执行的支持。ExecutorServices提供了安排任何函数的异步执行的方法,表示为Callable结果的模拟Runnable。A Future返回函数的结果,允许确定执行是否已完成,并提供取消执行的方法。A RunnableFuture是Future 拥有一种run方法,在执行时设置其结果。
    class DirectExecutor implements Executor {
        public void execute(Runnable r) {
            r.run();
        }
    }
    class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) {
            new Thread(r).start();
        }
    }
     
    Executor接口的execute是在ThreadPoolExecutor中实现的:
     
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
        }
         else if (!addWorker(command, false))
        reject(command);
    }
     
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    ctl它表示了两个概念:
    workerCount:当前有效的线程数
    runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。
     
    ctl.get()方法是在AtomicInteger类中的
     
    public final int get() {
        return value;
    }
     
     
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
             //由它可以获取到当前有效的线程数和线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);
     
            // 检查队列是否为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
                return false;
     
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
    // 首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
    //  否则CAS由于workerCount更改而失败;重试内循环
            }
        }
     
    // 正在运行的线程数自增成功后则将线程封装成工作线程Worker
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
          //全局锁
                final ReentrantLock mainLock = this.mainLock;
          //获取全局锁
                mainLock.lock();
                // 当持有了全局锁的时候,还需要再次检查线程池的运行状态等
                try {
                     //持锁时重新检查。
                    //退出线程工厂失败或如果
                    //获取锁之前关闭。
                    int rs = runStateOf(ctl.get());     //线程池运行状态
     
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())          //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();     //工作线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;         //新构造的工作线程加入成功
                     }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();         //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法
                    workerStarted = true;
                }
            }
        } finally {
        //未能成功创建执行工作线程
            if (! workerStarted)
                addWorkerFailed(w);     //在启动工作线程失败后,将工作线程从集合中移除
        }
        return workerStarted;
    }
     
  • 相关阅读:
    【转】Git: There is no tracking information for the current branch.
    【转】git多个远程仓库
    【转】python批量快速合并excel文件
    【转】HTML5-postMessage实现跨域
    Python3正则表达式search和findall差异讨论
    Python Segmentation fault错误定位办法
    Python3压缩和解压缩实现
    Python3+profile性能分析
    案例:ADG环境遇到redo日志member路径有误以及RMAN-6571错误
    CentOS7的udev的绑定规则
  • 原文地址:https://www.cnblogs.com/gqymy/p/10303347.html
Copyright © 2011-2022 走看看