zoukankan      html  css  js  c++  java
  • 多线程JDK工具篇-线程池原理

    12.1 为什么要使用线程池

      一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。】

    使用线程池主要有以下三个原因:

    1. 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
    2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
    3. 可以对线程做统一管理。

      

    12.2 线程池的原理

    Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

    我们先看看ThreadPoolExecutor类。

       public ThreadPoolExecutor(int corePoolSize,//核心线程,创建后就一直存在,空闲也不会被销毁
                                  int maximumPoolSize,//线程数最大值
                                  long keepAliveTime,//非核心线程存活时长,非核心线程空闲到一定程度就会被销毁。
                                  TimeUnit unit,//keepAliveTime的单位
                                  BlockingQueue<Runnable> workQueue,//线程池的等待执行的任务(Runnanble对象)队列
                                  ThreadFactory threadFactory,//可选参数,线程工厂默认创建一个
                       RejectedExecutionHandler handler//可选参数,拒绝处理策略) 
    

      

    workQueue,工作队列,有几种常见的队列

      1.ArrayBlockingQueue 

      数组阻塞队列,限定长度的队列(必须声明队列长度)。

      2.LinkedBlockingQueue

      链表阻塞队列,队列的默认长度默认值为Integer.MAX。

      3.SynchronousQueue

      同步队列。没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

      4.DelayQueue

      延迟队列。队列中的元素需要实现Delayed()方法,线程需要等待延迟时间结束后才能获取队列的任务执行。

    RejectedExecutionHandler handler拒绝处理策略

      线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :

    1. ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。

    2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。

    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。

    4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

    ThreadPoolExecutor的状态

    线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

    故线程池也有自己的状态。ThreadPoolExecutor类中定义了一个volatile int变量runState来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

      • 线程池创建后处于RUNNING状态。

      • 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。

      • 调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

      • 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

        ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。

      • 线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

    常见的4种线程池

    1.FixedThreadPool 固定长度的线程池

    可以用来控制并发数量,如果没有空闲的核心线程,任务会在队列中等待。核心线程数=最大线程数。采用LinkedBlockingQueue(队列为Integer.MAX,理论上无限大)。

       public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    2.CachedThreadPool 缓存线程池

    核心线程数为0,非核心线程数最大值为Integer.MAX,理论上不限。线程存活时间为60秒,使用SynchronousQueue。一个任务进来,队列将任务提交给线程执行,如果没有空闲的线程则创建新的线程执行,线程空闲存活时间为60秒。一定程度上复用了线程。

      public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    3.SingleThreadPool 单线程线程池

     核心线程数为1,非核心线程数为0,采用LinkedBlocakingQueue,与FixedThreadPool一致。

       public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }  

    4.ScheduledThreadPool 周期线程池

    定长线程池,支持周期和定时任务,采用DelayQueue,有核心线程数和非核心线程数,线程数最大值也为Integer.MAX

        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        } 

    5. WorkStealingPool

    工作窃取线程池。采用Fork/Join线程池实现的,空闲的线程会去窃取其他线程的工作队列(于队尾)。

        public static ExecutorService newWorkStealingPool(int parallelism) {
            return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }

     

     线程复用

    将线程封装成工作线程,采用while循环一直去Queue获取task,调用task的run方法执行,直到线程中断或者Queue中的task为0。

       final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {//while循环
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();//执行task实例的run方法,此时只是调用实例的方法,不会触发创建线程。
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

      来一个例子

        public static void main(String[] args){
    new MyThread().run();
    System.out.println(Thread.currentThread().getName() +":main");
    }

    public static class MyThread extends Thread{
    @Override
    public void run(){
    try {
    Thread.currentThread().sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(Thread.currentThread().getName() + ":MyThread");
    }
    }

      输出如下,阻塞了,调用了当前的主线程来执行。

    main:MyThread
    main:main

      

  • 相关阅读:
    LinuxMCE
    qBittorrent 0.9.0
    Exaile 0.2.9
    GAdminHttpd:图形化的 Apache 打点对象
    FBReader-电子书阅读对象
    CSSED:Linux 下 Web 拓荒者的 CSS 编纂利器
    Canorus:乐谱编辑软件
    AutoScan-收集监视及办理器械
    Lunar Applet:在桌面表现阴历
    Totem 2.18.1
  • 原文地址:https://www.cnblogs.com/knsbyoo/p/14036954.html
Copyright © 2011-2022 走看看