zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor以及计数器CountDownLatch的使用

    一:ThreadPoolExecutor的使用

      项目中用到夜间跑批的问题,考虑使用多线程同时跑批,每次拿出100批数据,创建定长线程池,长度

    为10,然后将这100批数据分成10份,10个线程各自处理自己的那一部分,每当有一个线程处理完成后就会

    进入等待,计数器减1,直到计数器为0时,说明每个线程都完成了自己的工作,然后进入主线程执行。

    1:创建定长线程池

    1 public static final int THREAD_NUM = 10;
    2     public ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUM);// 创建10个线程放入线程池内

    2:创建线程任务

     1 public class myRun implements Runnable{
     2         
     3         private List<Map<String,Object>> tmpList;
     4         private int threadCount;
     5         private CountDownLatch countdown;
     6         private int oneThreadCount;
     7         
     8         public myRun(List<Map<String, Object>> tmpList, int threadCount,
     9                 CountDownLatch countdown,int oneThreadCount) {
    10             super();
    11             this.tmpList = tmpList;
    12             this.threadCount = threadCount;
    13             this.countdown = countdown;
    14             this.oneThreadCount = oneThreadCount;
    15         }
    16 
    17         @Override
    18         public void run() {
    19             for (int i = threadCount * oneThreadCount; i < (threadCount + 1) * oneThreadCount
    20                     && i < tmpList.size(); i++) {
    21                 System.out.println(Thread.currentThread().getName()+ "#" + i);
    22                 System.out.println(tmpList.get(i));
    23             }
    24             countdown.countDown();// 每个子线程结束后进行线程减1
    25             System.out.println(Thread.currentThread().getName()
    26                     + "##线程结束------------------------");
    27             System.out.println("----------------剩余的线程个数:"
    28                     + countdown.getCount());
    29         }
    30         
    31     }

    3:主线程执行任务

    public void doHandler() {
            try {
                List<Map<String, Object>> list = getList();
                final CountDownLatch countdown = new CountDownLatch(THREAD_NUM);// 子线程数
                int oneThreadCount = list.size()/THREAD_NUM + 1;
                for (int num = 0; num < THREAD_NUM; num++) {// 开启线程个数
                    System.out.println("----------------共有的线程个数:" + countdown.getCount());
                    threadPool.execute(new myRun(list,num,countdown,oneThreadCount));
                }
                countdown.await();// 这里进行同步等待,等所有子线程结束后,执行 countdown.await()后面的代码
                System.out.println("##结束等待------------------------");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown();
            }
        }
    
        public static List<Map<String, Object>> getList() {
            List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
            for (int i = 0; i < 99; i++) {
                Map<String, Object> tmpMap = new HashMap<String, Object>();
                tmpMap.put("id", 1);
                tmpMap.put("name", "rose");
                tmpMap.put("salary", 10000);
                resultList.add(tmpMap);
            }
            return resultList;
        }
        

    4:整个代码如下

     1 package com.hlcui.threadpool;
     2 
     3 import java.util.ArrayList;
     4 import java.util.HashMap;
     5 import java.util.List;
     6 import java.util.Map;
     7 import java.util.concurrent.CountDownLatch;
     8 import java.util.concurrent.ExecutorService;
     9 import java.util.concurrent.Executors;
    10 
    11 public class ThreadCountDemo {
    12     public static final int THREAD_NUM = 10;
    13     public ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUM);// 创建10个线程放入线程池内
    14 
    15     public void doHandler() {
    16         try {
    17             List<Map<String, Object>> list = getList();
    18             final CountDownLatch countdown = new CountDownLatch(THREAD_NUM);// 子线程数
    19             int oneThreadCount = list.size()/THREAD_NUM + 1;
    20             for (int num = 0; num < THREAD_NUM; num++) {// 开启线程个数
    21                 System.out.println("----------------共有的线程个数:" + countdown.getCount());
    22                 threadPool.execute(new myRun(list,num,countdown,oneThreadCount));
    23             }
    24             countdown.await();// 这里进行同步等待,等所有子线程结束后,执行 countdown.await()后面的代码
    25             System.out.println("##结束等待------------------------");
    26         } catch (InterruptedException e) {
    27             e.printStackTrace();
    28         } finally {
    29             threadPool.shutdown();
    30         }
    31     }
    32 
    33     public static List<Map<String, Object>> getList() {
    34         List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
    35         for (int i = 0; i < 99; i++) {
    36             Map<String, Object> tmpMap = new HashMap<String, Object>();
    37             tmpMap.put("id", 1);
    38             tmpMap.put("name", "rose");
    39             tmpMap.put("salary", 10000);
    40             resultList.add(tmpMap);
    41         }
    42         return resultList;
    43     }
    44     
    45     public class myRun implements Runnable{
    46         
    47         private List<Map<String,Object>> tmpList;
    48         private int threadCount;
    49         private CountDownLatch countdown;
    50         private int oneThreadCount;
    51         
    52         public myRun(List<Map<String, Object>> tmpList, int threadCount,
    53                 CountDownLatch countdown,int oneThreadCount) {
    54             super();
    55             this.tmpList = tmpList;
    56             this.threadCount = threadCount;
    57             this.countdown = countdown;
    58             this.oneThreadCount = oneThreadCount;
    59         }
    60 
    61         @Override
    62         public void run() {
    63             for (int i = threadCount * oneThreadCount; i < (threadCount + 1) * oneThreadCount
    64                     && i < tmpList.size(); i++) {
    65                 System.out.println(Thread.currentThread().getName()+ "#" + i);
    66                 System.out.println(tmpList.get(i));
    67             }
    68             countdown.countDown();// 每个子线程结束后进行线程减1
    69             System.out.println(Thread.currentThread().getName()
    70                     + "##线程结束------------------------");
    71             System.out.println("----------------剩余的线程个数:"
    72                     + countdown.getCount());
    73         }
    74         
    75     }
    76 }

    代码均已经验证。

    二:ThreadPoolExecutor代码的分析

    Executors类提供了很多静态方法:

    1:维护单线程的线程池

    2:固定线程数线程池

     

    3:还有来一个任务就创建一个线程的缓存线程池

     

    4:还有这种延迟对列的线程

     这里面有一个很重要的方法,所以的线程池的创建几乎都依赖它,

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

     先看一下它的参数:

    corePoolSize:核心线程数   就是线程池在没有任务时,一般需要维持的线程数,当然,线程池创建后是0,当来第一个任务的时候就创建一个,依次类推

    maximumPoolSize:最大线程数,当核心线程处理不了大量的请求时,就会把请求任务放入对列中,如果对列的容量达到上限,就会启用最大线程,继续创建线程处理请求

    keepAliveTime:保持存活时间,如果当前线程池中维护的线程数大于核心线程数,且线程处于空闲状态,那么这个参数就是线程能够存活的时间

    unit:时间单位,和上一个参数绑定使用

    workQueue:对列,当核心线程数,处理不了当前请求任务时,就会把请求放入对列中

    threadFactory:线程工厂,就是线程池创建线程的工具

    rejectExecutionHandler:拒绝执行策略,当最大线程也处理不了请求任务时,如果这时候还有很多请求过来,那么就会启用拒绝策略

    看一下单线程池:

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    当第一个请求来时,会启动一个线程处理,第一个请求没有处理完,再来一个请求时,就会放入对列。

    看一下这个对列:

    是LinkedBlockQueue,容量为Integer的最大值,可以认为是无界的

     

     后续来的请求会一直向这个对列放,然后线程处理完上一个请求后,会从对列中取下一个请求来处理。

    使用单线程池可以保证任务执行的有序

    再看一下固定线程数线程:

    和单线程池差不多,区别就是核心线程数和最大线程数,使用的对列也是无界LinedBlockingQueue对列

    但是这个线程池可以并发处理请求任务,如果请求任务过多,也会入队列,等核心线程处理完当前任务,再从

    对列中取

    看一下CachedThreadPool这个线程,和其他的线程池不一样,这个线程池维护的线程核心数为0,最大数为Integer最大值

    她使用的对列是同步对列,其实不存储请求任务,它的工作原理是,来一个请求任务,我就创建一个线程处理,处理完之后

    在线程池存活60s,如果再来请求任务,直接使用空闲线程处理。如果没有空闲线程就创建,可以创建很多线程。

    这个线程池比较适合任务处理时间短,并发比较大的请求。

    阿里的开发手册建议开发者不要直接使用Executors类中的静态方法,最好是自己使用ThreadPoolExecutor这个构造函数创建,根据

    自己实际的业务需求定制线程池。

    三:读源码

    下面来看看ThreadPoolExecutors类中的方法

    1 :execute: 核心代码

    a:检查核心线程数

    检查线程池中维护的worker的数量是否小于核心线程数

    if (workerCountOf(c) < corePoolSize) {

    如果小于,则创建新的线程处理请求任务

    if (addWorker(command, true))
    

      

    b:放入对列

    如果不小于,则程序继续向下走:

     if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }

    判断线程池处于运行状态,则把请求放入对列,放入成功后,再次判断线程池状态,如果状态不是运行,

    则将任务从对列删除,然后使用线程池拒绝策略处理请求任务,如果这时工作线程的数量为0(有可能线程抛异常挂掉了),

    则创建线程处理任务

    c:继续启动线程

    如果放入对列失败,则程序继续向下走

     如果创建线程失败,则使用线程池的拒绝策略处理

    这里面比较重要的一个方法是:addWorker

    下面我们来看一下这个方法:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

      

    前面是一个死循环的判断:

    如果线程池的状态是shutdow、stop或者terminated等状态,

    如果请求任务不为空,则退出,shutdown状态后只会把对列内的线程处理完毕,不会再创建新的线程

    或者对列为空,则退出,对列为空说明任务已经处理完毕,不需要再创建新的线程

    如果创建的线程是核心线程,则和核心线程数进行比较,如果大于等于核心线程数,则退出,不创建新的线程

    如果创建的线程是最大线程时,则和最大线程数进行比较,如果大于最大线程数,则退出,不创建新的线程

    boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
    

      

    所有创建的线程都会放到workers集合中,放置成功后,就会

    调用 start方法,然后该线程就处于就绪状态,等待cpu分配时间片

     来看一下创建worker的过程:

     传进来的这个firstTask是我们要执行的任务,请求。

    然后赋值给Worker对象中的一个变量引用,使用我们创建线程池传进来的线程工厂创建线程,如果不传会使用默认线程工厂,

    我们看一下源码:

     

     worker对象作为Runnable对象维护到了线程中,

    Worker对象持有这个Thread对象的引用,这个Thread对象也持有worker对象的引用

    那么当Thread start后,cpu调用Thread run的时候,其实调用的是worker的run方法

     一路跟下来,我们知道这个worker就是run方法中的target,那么我们回过头来看看worker类,

    worker的run方法:

     runWorker方法:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

      

    这个firstTask才是我们执行execute传进来的Runnable对象,

    这里才能执行我们实现Runnable的真正逻辑,如果失败,则把异常抛出去。

    最后这个异常会抛给虚拟机。

    下面来看一下这个方法,红框标注的

    如果这个task!=null 则为执行任务,这个task就是第一次给这个worker分配的任务,如果这个任务已经执行完毕就被置位null,任务被gc了。

    那么这个线程下一次分配到cpu的时间片时,就是执行 task=getTask()这个方法。

     这个方法就是从对列中取出任务执行,

    从工作对列中取出Runnable来执行。

    2: submit 方法

    一般任务execute是执行没有返回值的任务,实现Runnable,submit提交的

    任务一般是需要返回值的,实现Callable,

    submit重载了3个方法,既支持callable,又支持Runnable

    线程池的submit实现在ThreadPoolExecutors的父类 AbstractExecuteService中,我们来看一下

    这里的execute的执行逻辑和上面的execute是一样的,区别就是这里的入参是RunnableFuture对象,当然这个类也实现了Runnable,而execute的入参是Runnable对象,

    一路跟着源码下来,可以看到,把我们传进来的入参封装到了FutureTask里面,FutureTask类又实现了RunnableFuture类

     

    那么根据执行execute的经验,

    当cpu执行Thread的run方法的时候,会执行Worker的run  ,runWorker方法,然后 task.run

    这个task就是FutureTask对象,而不是execute时候我们自己实现Runnable的对象了。

    我们来看一下FutureTask类的run方法:

     它的run方法里面的callable变量才是我们自己实现Callable接口创建的对象,

    处理成功,则把执行结果set到report输出变量上,处理失败则把Throwable对象set到report输出变量上。

    到这里 submit的调用链也结束了,再来看一下取结果。

    get方法:

    先来看一下FutureTask维护的几个状态:

     New 创建一个FutureTask对象

    Completing:任务执行刚完成

    Normal:任务执行完成,马上会被设置成Normal

    当我们收集完FutureTask后,可以通过get方法获取异步的执行结果

     如果futureTask的状态是new或者刚刚完成时,

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

      

    get的时候会一直在死循环里面执行,知道s的state为normal,也就是结果已经设置到outcome中

    如果当前线程中断了,直接抛异常

    今天的学习就到这里!

  • 相关阅读:
    Flash/Flex学习笔记(30):不用startDrag和stopDrag的对象拖动
    Flash/Flex学习笔记(33):如何用As3协同Flash CS IDE控制MovieClip实例
    Flash/Flex学习笔记(26):AS3自定义右键菜单
    Flash/Flex学习笔记(32):播放音乐并同步显示lyc歌词(适用于Silverlight)
    Flash/Flex学习笔记(35):如何正确监听Stage对象的事件
    jQuery autoComplete 自动完成 支持中文
    黑马程序员视频学习下载地址记录一下
    汉语分词系统 网址
    【转】Lucene.Net 详解
    MongoDB开发学习 开天辟地,经典入门 解决关系型数据库大数据量处理的瓶颈问题
  • 原文地址:https://www.cnblogs.com/warrior4236/p/6055747.html
Copyright © 2011-2022 走看看