zoukankan      html  css  js  c++  java
  • 并发和多线程(十一)--线程池

    线程池:

      简单来说就是一组可以执行任务的空闲线程,可以用来做任务调度。

    为什么使用线程池?

      1、降低资源消耗。通过重复利用已创建的线程降低创建和销毁线程的消耗.

      2、提高响应速度。任务到达,可以不需要等待创建线程就可以执行.

      3、更好的管理线程。可以统一分配、调优和监控.

    Java通过executor对象来实现自己的线程池模型。

    Executors 类和 Executor 接口

      Executors:包含工厂方法创建不同类型的线程池,其本质就是new了一个ThreadPoolExecutor对象。

      Executor:是个简单的线程池接口,只有一个execute()方法。

    Executor框架结构:

    ExecutorService:

    void execute(Runnable command);
    
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);//可以得到返回值

    线程池种类:

    1、newSingleThreadExecutor():

      包含单个线程和无界队列的线程池,同一时间只能执行一个任务.

    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

    2、newFixedThreadPool():

      包含固定数量线程并共享无界队列的线程池;当所有线程处于工作状态,有新任务提交时,任务在队列中等待,直到一个线程变为可用状态.

    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())

    3、newCachedThreadPool():

      只有需要时创建新线程的线程池.

    new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())

    4、newScheduledThreadPool():

      基于任务调度的线程池,在给定的延迟后运行任务,或定期执行任务。功能与Timer类似,但是更强大、更灵活.

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

      DelayedWorkQueue是一个无界队列,内部使用priorityQueue。所以maximumPoolSize这时候没有意义,ScheduledThreadPoolExecutor会把调度的任务ScheduledFutureTask放到DelayQueue中.

    private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            private final long sequenceNumber;//表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
    
             private long time;//任务要被执行的时间
            private final long period;//表示任务执行的间隔周期
    }

      DelayQueue会对队列中的ScheduledFutureTask进行排序。time小的排在前面。如果time相同,比较sequenceNumber,序列号小的证明是先提交的要先执行.

    任务执行步骤:

    说明:

      1、DelayQueue通过DelayQueue.take()获取已到期的ScheduledFutureTask。到期是指time>=当前时间.

      2、执行获取的ScheduledFutureTask.

      3、修改ScheduledFutureTask的time为下次要被执行的时间.

      4、把time修改后的ScheduledFutureTask通过DelayQueue.add()放回DelayQueue中.

    ScheduledExecutorService:

      这是ExecutorService的一个子接口,增加了调度任务的方法。

    ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    
    Future<Double> future = executor.schedule(callableTask, 2, TimeUnit.MILLISECONDS);
    
    executor.scheduleAtFixedRate(() -> System.out.println("Fixed Rate Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);
    //schedule() 方法的参数指定执行的方法、延时和 TimeUnit
    //scheduleAtFixedRate() 方法延时 2ms执行任务,然后每 2s 重复一次。相似的,scheduleWithFixedDelay() 方法延时2毫秒后执行第一次,然后在上一次执行完成2秒后再次重复执行。

    ThreadPoolExecutor:

      这个线程池的实现增加了配置参数的能力。创建ThreadPoolExecutor对象最方便的方式就是通过Executors工厂方法:

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
    maximumPoolSize <= 0 ||
    maximumPoolSize < corePoolSize ||
    keepAliveTime < 0)
    throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
    }

    构造器参数:

    1、corePoolSize:

      核心池的大小,默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中.

    2、maximumPoolSize:

      线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。如果Queue满了,只能创建新的线程去执行.

    任务,但是如果是无界Queue,这个参数不起作用

    3、keepAliveTime:

      线程池的工作线程处于空闲之后,保持存活的时间。当任务很多,但是执行时间较多的时候,可以调高这个参数,提高线程利用率.

    4、unit:

      参数keepAliveTime的时间单位,取值格式如下.

    TimeUnit.DAYS;//
    TimeUnit.HOURS;//小时
    TimeUnit.MINUTES;//分钟
    TimeUnit.SECONDS;//
    TimeUnit.MILLISECONDS;//毫秒
    TimeUnit.MICROSECONDS;//微妙
    TimeUnit.NANOSECONDS;//纳秒

    5、workQueue:

      一个阻塞队列,用来存储等待执行的任务。无界队列和有界队列影响最大线程数是否起作用.

    一般来说,这里的阻塞队列有以下几种选择:

      1、ArrayBlockingQueue:基于数组的有界阻塞队列,FIFO.

      2、LinkedBlockingQueue:基于链表的阻塞队列,FIFO,吞吐量通常高于ArrayBlockingQueue.

      3、SynchronousQueue:不存储元素的阻塞队列,每个add操作必须等之前线程调用remove操作,否则add一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue.

      4、PriorityBlockingQueue:一个具有优先级的无阻塞队列.

    一般使用LinkedBlockingQueue和SynchronousQueue。线程池的排队策略与BlockingQueue有关。

    6、threadFactory:线程工厂,主要用来创建线程

    7、handler:

      表示当拒绝处理任务时的策略(Queue和ThreadPool都满了之后,线程池处于饱和状态),有以下四种取值:

      1、ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。

      2、ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务,但是不抛出异常。

      3、ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程).

      4、ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务.

    当然也可以自定义策略 

    线程池执行流程:

    ThreadPoolExecutor执行示意图:

     

     线程池执行源码execute:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
           int c = ctl.get();
             if (workerCountOf(c) < corePoolSize) {//当前线程数小于核心线程数
                if (addWorker(command, true))//开启新的工作线程,并将Runnable传入,作为第一个要执行的任务
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//线程池处于Running状态,且线程数大于等于核心线程数,将Runnable加入到workQueue
                int recheck = ctl.get();
    //重新检查,如果非Running状态,将Runnable从workQueue取出,因为可能在上一步的过程中,线程被关闭,然后交给RejectedExecutionHandler调用rejectedExecution方法,拒绝执行此任务
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)//如果线程池线程数量为0,则创建一条新线程执行
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))//如果线程池处于非RUNNING状态或将Runnable添加到队列失败,则拒绝执行此任务
    
                reject(command);
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    ctl:

      高3位用来表示线程池的状态(runState),低29位用来表示工作线程的个数(workerCnt),因为线程池一共有5种状态,所以需要3位来表示,最终获取线程池的最新运行状态,线程数量.

    新增线程addWorker:

    private boolean addWorker(Runnable firstTask, boolean core) {//firstTask线程池第一个要执行的线程,core是否核心线程
            retry:
            for (;;) {//外层的循环不断判断线程池状态
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 线程池shutdown状态,Runnable为null,workQueue为空,返回false,说明线程添加失败
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {//内层循环判断线程池容量
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))//增加线程的个数,如果成功,返回两层for循环
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
        //这里是真的创建线程的地方
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);//创建worker对象,把Runnable传入
                final Thread t = w.thread;//从worker得到线程
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();//加锁
                    try {
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck是否启动状态
                                throw new IllegalThreadStateException();
                            workers.add(w);//把worker加入workers集合
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {//thread启动,新增线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    worker:包含了一个firstTask和一个Runnable.

    执行任务runWorker():

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;//取出首个需要执行的任务
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {//不断循环获取任务
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //检查状态
                wt.interrupt();
    try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行worker中Runnable的任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally {//把任务置空,下次又可以循环重复使用 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

    获取任务getTask():

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary. 或者线程池为shutdown,线程数减一
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);//线程数
    
                // allowCoreThreadTimeOut参数作用:是否允许核心线程超时策略,默认false,或者当前线程数大于核心线程数,返回true
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {//根据前面timed参数,决定从workQueue中获取线程的方式,true,poll限时等待,false,一直阻塞,如果获取不到
                    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                    if (r != null)//返回Runnable
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    private final BlockingQueue<Runnable> workQueue;//workQueue为FIFO队列

    执行execute()的流程:

      1、判断线程数量、线程池状态,都符合通过通过addWork()创建新的线程.

      2、addWork()经过判断符合,将Runnable添加到Worker,执行thread.start(),实际上执行的是Worker中run(),继而执行runWorker().

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
    
            private static final long serialVersionUID = 6138294804551838833L;
    
            final Thread thread;
    
            Runnable firstTask;
    
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
    
            public void run() {
                runWorker(this);
            }
        }

      3、runWorker()中通过getTask()不断获取任务,知道获取到,然后执行.

    submit():

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    用于提交需要返回值的任务。返回一个future对象,通过这个对象判断是否执行成功.

    public static void main(String[] args){
            retry:
            for (;;) {
                for (;;) {
                    break retry;
                }
            }
            ExecutorService service = null;
            try {
                service = Executors.newSingleThreadExecutor();
                Thread thread = new Thread(new TestUnit());
                Future future = service.submit(thread);
                Object s = future.get();//会一直阻塞,知道获取结果
                future.get(1,TimeUnit.SECONDS);//阻塞一段时间返回
            } catch (InterruptedException e) {
                e.printStackTrace();//
            } catch (TimeoutException e) {
                e.printStackTrace();//
            } catch (ExecutionException e) {
                e.printStackTrace();//处理无法执行的异常
            } finally {
                service.shutdown();//通过interrupt()中断线程
            }
        }

    线程池状态:

      RUNNING: 接收新任务,并执行队列中的任务.

      SHUTDOWN: 不接收新任务,但是执行队列中的任务.

      STOP: 不接收新任务,不执行队列中的任务,中断正在执行中的任务.

      TIDYING: 所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法.

      TERMINATED: terminated()方法执行完成.

    ForkJoinPool:

      jdk1.7另一个线程池的实现是 ForkJoinPool 类。它实现了 ExecutorService 接口,并且是 Java 7 中 fork/join 框架的重要组件。

    fork/join 框架基于“工作窃取算法”。适用于任务创建子任务的情况,或者外部客户端创建大量小任务到线程池。

    ForkJoinPool工作流程如下:

      1、创建 ForkJoinTask 子类

      2、根据某种条件将任务切分成子任务

      3、调用执行任务

      4、将任务结果合并

    实例化对象并添加到池中,创建一个 ForkJoinTask,你可以选择 RecursiveAction 或 RecursiveTask 这两个子类,后者有返回值。

    ThreadPoolExecutor 与 ForkJoinPool 对比:

      初看上去,似乎 fork/join 框架带来性能提升。但是这取决于你所解决问题的类型。当选择线程池时,非常重要的一点是牢记创建、管理线程以及线程间切换执行会带来的开销。

      ThreadPoolExecutor 可以控制线程数量和每个线程执行的任务。这很适合你需要在不同的线程上执行少量巨大的任务。

      相比较而言,ForkJoinPool 基于线程从其他线程“窃取”任务。正因如此,当任务可以分割成小任务时可以提高效率。

    为了实现工作窃取算法,fork/join 框架使用两种队列:

      当线程执行完自己任务队列中的任务,它们试图从其他队列获取任务。为了使这一过程更加高效,线程任务队列使用双端队列(double ended queue)数据结构,一端与线程交互,另一端用于“窃取”任务。

    来自The H Developer的图很好的表现出了这一过程:

     

     

    和这种模型相比,ThreadPoolExecutor 只使用一个主要队列。

    最后要注意的一点 ForkJoinPool 只适用于任务可以创建子任务。否则它和 ThreadPoolExecutor 没区别,甚至开销更大。


    线程池的缺点:

      1、用的线程池过大或过小:如果线程池包含太多线程,会明显的影响应用的性能;另一方面,线程池太小并不能带来所期待的性能提升

      2、可能发生死锁

      3、等待执行时间很长的任务:为了避免长时间阻塞线程,你可以指定最大等待时间,并决定过期任务是拒绝处理还是重新加入队列。

    结论:

      线程池有很大优势,简单来说就是可以将任务的执行从线程的创建和管理中分离。另外,如果使用得当,它们可以极大提高应用的性能。

      如果你学会充分利用线程池,Java 生态系统好处便是其中有很多成熟稳定的线程池实现。

    源码部分学习参考:https://www.jianshu.com/p/edab547f2710

    推荐书籍:《并发编程的艺术》《并发编程实战》,前一本比较通俗,适合面试,后一本老外写的,算是并发编程方面的神书,可以有一定基础进行阅读

  • 相关阅读:
    Hyper-V无法启动虚拟机因为虚拟机监控程序未运行
    SpringBoot项目中自动加载datasourceConfig配置导致启动失败
    redis 数据类型与命令
    Redis入门与安装,与配置
    MySQL 主从配置
    MySql 中的事务
    什么是Docker?
    window10下安装Docker
    Docker 常见命令
    原生SQL语句
  • 原文地址:https://www.cnblogs.com/huigelaile/p/10863672.html
Copyright © 2011-2022 走看看