zoukankan      html  css  js  c++  java
  • java线程池ThreadPoolExecutor

    使用线程池的好处:

    • 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率
    • 线程并发数量过多,抢占系统资源从而导致阻塞
    • 对线程进行一些简单的管理

    1、ThreadPoolExecutor的一个构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
    }

    参数说明:

    corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
    maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用的是无界的队列,那么这个参数没有意义了。
    keepAliveTime:线程池工作线程空闲后,保持的存活时间。
    unit :上面时间的单位
    workQueue:执行任务在执行之前加入的queue。该队列仅保存由execute方法提交的Runnable任务。
    threadFactory:创建新线程的工厂方法。可以通过线程工厂给每个创建的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
    handler:当队列满时,线程处于饱和策略。

    2、工作流程

    如果使用无界队列很简单,开启核心线程数,多余的全部阻塞直到内存耗尽,故不建议使用。
    如果使用有界队列,工作流程如下:
      * 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
      * 若大于corePoolSize,则会将任务加入队列,
      * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
      * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。

    3、handler饱和策略

    官方给出的饱和策略有如下几种:

      AbortPolicy:直接抛异常。
      DiscardPolicy:顾名思义,直接丢弃多余的任务。
      DiscardOldestPolicy:丢弃队列中最老的任务,并且重试exexute方法。
      CallerRunsPolicy:在调用线程中执行丢弃的任务。

    也可以自定义策略来处理,例如:

    public class MyRejected implements RejectedExecutionHandler {
        public MyRejected(){
        }
    
        //拒绝策略实际调用的方法处理丢弃的任务,可以做一些日志处理
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义饱和策略,当前被拒绝任务为:" + r.toString());
        }
    }

    //使用
    ...
     ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,              //coreSize
                    200,              //MaxSize
                    60,             //60
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3)         //指定一种队列 (有界队列)
                    , new MyRejected()
                    , new DiscardOldestPolicy()
            );
    ...

    4、队列

    ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照FIFO(先进先出)原则对元素进行排序
    LinkedBlockingQueue:是一个基于链表结构的有界阻塞队列,此队列按照FIFO排序元素,吞吐量高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool(n)使用了此队列
    PriorityBlockingQueue:一个具有优先级的无限阻塞队列
    SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了此队列

    5、示例

    任务线程:

    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
    
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
    
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }       
        }
    
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }

    线程池执行器:

    public class UseThreadPoolExecutor1 {
    
    
        public static void main(String[] args) {
            /**
             * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
             * 若大于corePoolSize,则会将任务加入队列,
             * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
             * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
             * 
             */ 
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,              //coreSize
                    2,              //MaxSize
                    60,             //60
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(3)         //指定一种队列 (有界队列)
                    //new LinkedBlockingQueue<Runnable>()
                    //, new MyRejected()
                    , new DiscardOldestPolicy()
                    );
    
            MyTask mt1 = new MyTask(1, "任务1");
            MyTask mt2 = new MyTask(2, "任务2");
            MyTask mt3 = new MyTask(3, "任务3");
            MyTask mt4 = new MyTask(4, "任务4");
            MyTask mt5 = new MyTask(5, "任务5");
            MyTask mt6 = new MyTask(6, "任务6");
    
         //ThreadPoolExecutor.execute(Runnable command)方法即可向线程池内添加一个任务。 pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6);
         //关闭线程池 pool.shutdown(); } }

    执行结果:

    run taskId =5
    run taskId =1
    run taskId =3
    run taskId =4
    run taskId =6

    分析:
    使用有界阻塞队列,先为1开启一个核心线程,然后为2 3 4入队,然后
    为5开启一个线程(最大线程),本来应该是舍弃6,但是指定了舍弃策略
    是舍弃最老的,也就是阻塞队列队头元素2,所以舍弃2而执行6

    6、常见四种线程池

    1.可缓存线程池CachedThreadPool()
    源码:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
    }    

    这种线程池内部没有核心线程,线程的数量是有没限制的。在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。

    创建方法:

    ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();

    使用:

    private void startDownload(final ProgressBar progressBar, final int i) {
            mCachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    int p = 0;
              //每隔10s progressBar.setMax(
    10); while (p < 10) { p++; progressBar.setProgress(p); Bundle bundle = new Bundle(); Message message = new Message(); bundle.putInt("p", p); bundle.putString("ThreadName", Thread.currentThread().getName()); message.what = i; message.setData(bundle); mHandler.sendMessage(message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); }

    2.定长线程池FixedThreadPool 
    源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    }

    该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

    如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

    创建方法:

    //nThreads => 最大线程数即maximumPoolSize
    ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);
    //threadFactory => 创建线程的方法,用得少
    ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

    使用:

    private void startDownload(final ProgressBar progressBar, final int i) {
        mFixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                //....逻辑代码自己控制
            }
         });
    }

    3.单线程线程池SingleThreadPool

    源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
    }    

    有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则


    创建方法:

    ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();

    使用同FixedThreadPool 

    4.定时任务线程池ScheduledThreadPool
    源码:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    //ScheduledThreadPoolExecutor():
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
            DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
            new DelayedWorkQueue());
    }

    DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。

    不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。
    这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。

    创建方法:

    //nThreads => 最大线程数即maximumPoolSize
    ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

    使用:

    //表示在3秒之后开始执行我们的任务。
    mScheduledThreadPool.schedule(new Runnable() {   @Override public void run() { //.... } }, 3, TimeUnit.SECONDS);
    //延迟3秒后执行任务,从开始执行任务这个时候开始计时,每5秒执行一次不管执行任务需要多长的时间。 
    
    mScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //....
        }
    },3, 5, TimeUnit.SECONDS);

    7、其他

      * 执行线程次提交的任务通过execute方法和submit方法(submit提供了重载方法)。execute不返回值,而submit返回值,并且可以通过返回的Future对象的get方法查看返回值。

      * 使用无界BlockingQueue:与有界队列相比,除非系统资源耗尽,否则无界的队列不存在任务入队失败的情况。当有新任务到来,系统的线程小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的队列加入,而没有空闲的线程资源,则队列直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。所以一般不建议使用无界队列,另外使用无界队列的时候,最大线程数,拒绝策略等都失去了意义

      *阿里编译器建议线程池创建时最好能手动进行创建,给出理由:

        newFixedThreadPoolExecutor和newSingleThreadPoolExecutor:主要问题是堆积请求的处理队列可能会消耗非常大的内存,甚至OOM     
        newCachedThreadPoolExecutor和newScheduledThreadPoolExecutor:主要问题是线程数最大是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM

    源码参照Github

  • 相关阅读:
    Ghost Button制作教程及设计趋势分析
    onhashchange事件--司徒正美
    window.location.hash属性介绍
    优质UI的7条准则(一)
    当在浏览器地址栏输入一个网址的时候,究竟发生了什么?
    全球最快的JS模板引擎
    眨眼登录表单
    DIV+CSS规范命名
    es6--export,import
    es6--class
  • 原文地址:https://www.cnblogs.com/kingsonfu/p/10393230.html
Copyright © 2011-2022 走看看