zoukankan      html  css  js  c++  java
  • 原创:ThreadPoolExecutor线程池深入解读(一)----原理+应用

    本文档,适合于对多线程有一定基础的开发人员。对多线程的一些基础性的解读,请参考《java并发编程》的前5章。
    对于源代码的解读,本人认为可读可不读。如果你想成为一位顶级的程序员,那就培养自己底层的逻辑能力,自己写算法,然后让别人学习你的源代码。研究源代码这件事,更多的是针对于初学者。贡献源码的人,也是程序员,只不过是级别不同,或者在理论上,更加高屋建瓴。在现实中,能够兼顾理论和编程的程序员不多,如果谁想成为一流程序员的话,建议从理论上入手,代码量不能代表全部。对于多线程,本人仍然认为,理论很重要。

    多线程编程,在软件开发中占有十分重要的地位。本人对线程同步的本质的理解是:把对一个或者多个的共享状态的复合操作转变为原子性的操作,同时保证共享状态在内存中的可见性。抽象起来就是原子性和可见性。

    1.多线程并发时,会存在竞态条件。常见的竞态条件包括先检查后执行机制的竞争和原子性操作竞争,比如同时对一个整数++操作,这个操作可以分割为三个步骤:读取、加法操作与写入(生效)。解决先检查后执行机制的竞态条件的有效手段是采用双检索。对方法加锁,会大大滴降低吞吐量和性能,因此,不建议直接对方法加锁,常见的做法是,对多个线程同时竞争的变量加锁,或者采用ReentrantLock底层的CAS算法(free-lock).如果想深入理解ReentrantLock的原理,请查看java.util.concurrent包下的源代码。
    2.任务执行策略与中断策略和饱和策略:在多线程环境中,当定义好了公共资源类,与执行任务时(比如生产者与消费者任务),接下来就要考虑任务执行策略与中断策略和饱和策略,以提升系统的吞吐量和性能,同时在运行时,要考虑吞吐量与CPU占有率的折中。在多线程中,最重要的就是以上三种策略的定制。采用默认的,不一定能满足要求。线程池底层,调用的是ThreadPoolExecutor这个类,我们可以扩展他,实现自己的需求。在这里,先讲一下,默认的任务执行策略。(任务执行策略包括:是否为每一个任务开启一个线程,还是所有任务在一个线程中执行,任务执行的顺序,比如FIFO,还是按照优先级等等),所以, 这里涉及到两个比较重要的东西:一是数量问题,包括线程池的基本容量,最大容量以及BlockingQueue<Runnable> 是采用有界的还是无界的,二是BlockingQueue的数据结构,如果执行顺序是FIFO,就采用非优先级的Queue,如果是按优先级,那就使用PriorityLinkedQueue。下面,结合一下ThreadPoolExecutor源代码讲解一下:
    在使用时,我们一般会这样:
    ExecutorService executor = Executors.newCachedThreadPool();
    executor.execute(Runnable);
    先从execute方法开始,一层一层剖析:
    ThreadPoolExecutor中的几个重要变量:
     
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
     
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
     
    The workerCount is the number of workers that have been permitted to start and not permitted to stop. 
    ctl是一个重要的变量,主要包装两个重要的概念:一是workerCount:effective number of threads,二是runState:  indicating whether running, shutting down etc   
    英文解释:
    The main pool control state, ctl, is an atomic integer packing
    two conceptual fields
    workerCount, indicating the effective number of threads
    runState,    indicating whether running, shutting down etc
    在以上状态变量中,RUNNING可以接受新的task,并且可以处理queue中的task,SHUTDOWN不可以接受新的task,但是可以处理queue中的task,其他的全都不可以。还是英文解释比较好,研究源代码,最好是看英文原版的,不要看汉语版的:
    RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {---------------------①
                if (addWorker(command, true))//如果添加失败,返回false,可能是由于创建线程时遇到意外,比如terminated,重新调用ctl.get()计算wc
                    return;
                c = ctl.get();
            }//如果当前执行的线程数量小于corePoolSize,但是添加任务时,遇到了意外,或者,当前执行的线程数量大于corePoolSize,这两种情况,都会进入②处代码
            if (isRunning(c) && workQueue.offer(command)) {------------------②//如果当前线程池中的线程正处于RUNNING状态,并且阻塞队列的容量没有达到上限,重新检查ctl.get()返回的状态
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);//如果此处状态不是RUNNING,也不是SHUTDOWN,那么,拒绝任务
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);//由于任务放到了BlockingQueue中,此处,在Worker中,不添加task,而是运行任务时,从queue取出task
            }
            else if (!addWorker(command, false))-------------------------③//除了以上情况以外,比如BlockingQueue饱和了,线程池容量也饱和了,执行饱和策略,默认为AbortPolicy,拒绝任务
                reject(command);
        }
     
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
     
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
     
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))//此处判断非常重要
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
     
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                w = new Worker(firstTask);------------------------------①//把firstTask加到Worker中,并创建一个线程
                final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
                        int rs = runStateOf(c);
     
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);------------------------------②//把worker加到Set<Worker>中
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;------------------------------③//添加成功
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();------------------------------④执行任务
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);------------------------------⑤//添加失败,从Set<Worker>中移除Worker
            }
            return workerStarted;
        }
    接下来,看看Woker:
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
     
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
     
     
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
     
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
     
             public void run() {----------------①
                runWorker(this);----------------②
             }
     
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);//重新置为0
                return true;
            }
     
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
     
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    Worker的本质是Runnable,因此在addWorker()中的t.start()中,实际是调用worker的run()方法,看②处的runWorker()方法:
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {---------------------①
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);-------------------------②
                        Throwable thrown = null;
                        try {
                            task.run();-------------------------③
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);-------------------------④
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    最重要的地方,已经做了标识。对于①处,(task = getTask()) != null,这是在execute方法中,当workerCountof(recheck)== 0时,把task放到BlockingQueue中,所以用getTask()取出task。在execute之前和之后,可以做一些事情,自定义扩展,比如实现统计和计时功能。
    以上为ThreadPoolExecutor源代码的关键地方的比较粗浅的解读,下面,来进入应用阶段:
    Executors.newFixedThreadPool(x)中,默认的,BlockingQueue为无界的LinkedBlockingQueue,使用无界的queue,会因为queue的无限制扩展,而导致资源被耗尽,Executors.newCachedThreadPool()中,线程池的大小没有限制,队列采用的是SynchronousQueue,SynchronousQueue本质上并不是一个队列,而是基于线程间传递机制的一种运行策略。当向SynchronousQueue中添加task时,必须保证线程在等待接收task,可以与运行的线程直接交互。如果需要实现线程池的容量和queue的容量都有限制,并且需要自定义执行策略和饱和策略时,可以扩展ThreadPoolExecutor。ThreadPoolExecutor的构造器中结束如下参数:
     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    其中有:colrePoolSize,线程池的基本大小, maximumPoolSize,线程池中能够同时运行的线程数量的上限,keepAliveTime,超过此时间,空闲线程将被回收,阻塞队列Blockin共Queue,还有RejectedExecutionHandler,任务拒绝处理类。
    下面, 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略
    package httpClient;
     
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.logging.Logger;
     
    /**
     * 自定义线程池,实现计时和统计功能,并且自定义有界队列以及饱和策略
     * @author TongXueQiang
     * @date 2016/05/19
     */
    public class MyThreadPoolExecutor extends ThreadPoolExecutor {
        private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
        private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
        private final AtomicLong numTasks = new AtomicLong(1);
        private final AtomicLong totalTime = new AtomicLong();
     
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
     
        }
        /**
         * 任务执行前
         */
        protected void beforeExecute(Thread t,Runnable r){
            super.beforeExecute(t, r);
            log.fine(String.format("Thread %s: start %s",t,r));
            startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
        }
        /**
         * 任务执行后
         * @param r 任务
         * @param t 执行任务的线程
         */
        protected void afterExecutor(Runnable r,Throwable t){
            try {
                Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
                Long taskTime = endTime - startTime.get();
                numTasks.incrementAndGet();
                totalTime.addAndGet(taskTime);
                log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
            } finally {
                super.afterExecute(r, t);
            }
        }
     
        protected void terminated () {
            try {
                log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
            } finally {
                super.terminated();
            }        
        }
    }
     
    //自定义简易爬虫
    package httpClient;
     
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    /**
     * 网页抓取
     * @author TongXueQiang
     * @date 2016/05/16
     */
    public class UrlHanding {
        private final int THREADS = 10;
        private final ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
        private final ExecutorService consumerExecutor = new MyThreadPoolExecutor(10, 10, 1000,TimeUnit.MILLISECONDS, q, new ThreadPoolExecutor.CallerRunsPolicy());//调用者执行的饱和策略
        private final CountDownLatch startLatch = new CountDownLatch(1);
        private final CountDownLatch endLatch = new CountDownLatch(THREADS);
        private static UrlQueue queue;
     
        public void urlHanding(String[] seeds) throws InterruptedException {        
            queue = getUrlQueue();
            System.out.println("处理器数量:"+Runtime.getRuntime().availableProcessors());
            long start = (long) (System.nanoTime() / Math.pow(10, 9));
            producerExecutor.execute(new GetSeedUrlTask(queue,seeds,startLatch));        
            producerExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
            producerExecutor.shutdown();
            startLatch.await();
     
            UrlDataHandingTask []url_handings = new UrlDataHandingTask[THREADS];
            for (int i = 0;i < THREADS;i++) {
                url_handings[i] = new UrlDataHandingTask(startLatch,endLatch,queue);
                consumerExecutor.execute(url_handings[i]);            
            }
            consumerExecutor.shutdown();
            startLatch.countDown();
            doSomething();
            endLatch.await();
     
            long end = (long) (System.nanoTime() / Math.pow(10,9) - start);
            System.out.println("耗时: " + end + "秒");
        }
     
        private void doSomething() {
     
     
        }
     
        private UrlQueue getUrlQueue() {
            if (queue == null) {
                synchronized(UrlQueue.class){
                    if (queue == null) {
                        queue = new UrlQueue();
                        return queue;
                    }
                }
            }
            return queue;
        }
    }
    上面,是典型的生产者和消费者线程模式,把ArrayBlockingQueue当做公共资源,这里,要处理好消费者线程无限期阻塞的问题,通过在queue的最后加入“毒丸”对象,当每个线程从queue中取出的对象为“毒丸”对象时,停止迭代。
    以下为消费者线程:
    package httpClient;
     
    import java.util.concurrent.CountDownLatch;
     
    public class UrlDataHandingTask implements Runnable {
        private CountDownLatch startLatch;
        private CountDownLatch endLatch;
        private UrlQueue queue;
     
        public UrlDataHandingTask(CountDownLatch latch, CountDownLatch endLatch, UrlQueue queue) {
            this.startLatch = latch;
            this.endLatch = endLatch;
            this.queue = queue;        
        }
     
        /**
         * 下载对应的页面并抽取出链接,放入待处理队列中
         * 
         * @param url
         * @throws InterruptedException
         */
        public void dataHanding(String url) throws InterruptedException {
            getHrefOfContent(DownPage.getContentFromUrl(url));
            for (String url0 : VisitedUrlQueue.visitedUrlQueue) {
                System.out.println(url0);
            }
        }
     
        @Override
        public void run() {
            try {
                startLatch.await();
            } catch (InterruptedException e1) {
                Thread.currentThread().interrupt();
            }
     
            while (!queue.isEmpty()) {
                try {
                    String url = queue.outElem();
                    if ("".equals(url.trim())) {//“毒丸”对象为空
                        queue.addElem(url);
                        break;
                    }
                    dataHanding(url);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            endLatch.countDown();
     
        }
     
        /**
         * 获取页面源代码中的超链接
         * 
         * @param content
         * @throws InterruptedException
         */
        public void getHrefOfContent(String content) throws InterruptedException {
            System.out.println("开始");
            String[] contents = content.split("<a href="");
            for (int i = 1; i < contents.length; i++) {
                int endHref = contents[i].indexOf(""");
                String aHref = FunctionUtils.getHrefOfInOut(contents[i].substring(0, endHref));
                if (aHref != null) {
                    String href = FunctionUtils.getHrefOfInOut(aHref);
                    if (queue.isContains(href) && !VisitedUrlQueue.isContains(href)
                            && href.indexOf("/code/explore") != -1) {
                        // 放入待抓取队列中
                        queue.addElem(href);
                    }
                }
            }
            System.out.println(queue.size() + "--抓取到的连接数");
            System.out.println(VisitedUrlQueue.size() + "--已处理的页面数");
        }
     
    }
    生产者线程:
    package httpClient;
     
    import java.util.concurrent.CountDownLatch;
     
    public class GetSeedUrlTask implements Runnable {
        private UrlQueue queue;
        private String[] seeds;
        private CountDownLatch startLatch;
     
        public GetSeedUrlTask(UrlQueue queue, String[] seeds,CountDownLatch startLatch) {
            this.queue = queue;
            this.seeds = seeds;
            this.startLatch = startLatch;
        }
     
        public void addUrl() {
            try {
                for (String url : seeds) {
                    queue.addElem(url);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
     
        @Override
        public void run() {
            addUrl();        
            try {
                queue.addElem("");//加入“毒丸”对象
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            startLatch.countDown();
        }
    }
    未完待续……
     
     
  • 相关阅读:
    关于软件设计中遇到的问题
    关于power shell
    如果TChart的发生异常
    重温gof版《设计模式》中的创建型模式
    如何更好的使用cvs
    存储过程返回临时表的问题
    关于bugzilla与svn结合的配置注意事项
    VC知识点:如何用vc6调试CGI程序
    如何让应用程序托盘化
    符号表
  • 原文地址:https://www.cnblogs.com/txq157/p/5509542.html
Copyright © 2011-2022 走看看