zoukankan      html  css  js  c++  java
  • 线程池之ThreadPoolExecutor

    所属包:

    java.util.concurrent.ThreadPoolExecutor

     类关系:

    public class ThreadPoolExecutor extends AbstractExecutorService

    1. 继承关系

    ThreadPoolExecutor 继承了一个抽象类:AbstractExecutorService

    public abstract class AbstractExecutorService implements ExecutorService

    而这个AbstractExecutorService实现了一个接口:ExecutorService

    public interface ExecutorService extends Executor

    这个ExecutorService接口又继承了一个类:Executor

    public interface Executor

    可以看出:

    Executor是一个顶层接口,它的子接口ExecutorService继承了它(其实还有一个子接口: ScheduledExecutorService),抽象类AbstractExecutorService实现了这个子接口ExecutorService,最终ThreadPoolExecutor 继承了抽象类AbstractExecutorService并且同时实现了子接口ExecutorService。

    2. 构造方法 

    最简单的一个构造方法:

        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory and rejected execution handler.
         * It may be more convenient to use one of the {@link Executors} factory
         * methods instead of this general purpose constructor.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }

    有五个参数:

    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
    maximumPoolSize - 池中允许的最大线程数 
    keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。 
    unit - keepAliveTime参数的时间单位 
    workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务

     实际上它调用了同类的另外一个构造方法,最后两个参数用的默认值

    另外一个构造方法:

        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }

    参数:

    corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut 
    maximumPoolSize - 池中允许的最大线程数 
    keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。 
    unit - keepAliveTime参数的时间单位 
    workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。 
    threadFactory - 执行程序创建新线程时使用的工厂 
    handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量 (拒绝策略)

    3. 如何使用

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class MyThreadPoolExecutor {
    
        /**
         * 使用有界队列:
         *  1、当线程数小于corePoolSize时,创建线程执行任务。
         *  2、当线程数大于等于corePoolSize并且workQueue没有满时,放入workQueue中
         *  3、线程数大于等于corePoolSize并且当workQueue满时,新任务新建线程运行,线程总数要小于maximumPoolSize
         *  4、当线程总数等于maximumPoolSize并且workQueue满了的时候执行handler的rejectedExecution。也就是拒绝策略。
         * 
         * ThreadPoolExecutor默认有四个拒绝策略:
         *  1、ThreadPoolExecutor.AbortPolicy()           直接抛出异常RejectedExecutionException
         *  2、ThreadPoolExecutor.CallerRunsPolicy()        直接调用run方法并且阻塞执行
         *  3、ThreadPoolExecutor.DiscardPolicy()           直接丢弃后来的任务
         *  4、ThreadPoolExecutor.DiscardOldestPolicy()    丢弃在队列中队首的任务
         *  
         * 
         *  
         */
        
        public static void main(String[] args) {
            
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3));
            
            Runnable[] runs = new Runnable[6];
            for (int i = 0; i < runs.length; i++) {
                runs[i] = new MyTask(i);
            }
            
            pool.execute(runs[0]);    //线程1个,队列0个
            pool.execute(runs[1]);    //线程1个,队列1个
            pool.execute(runs[2]);    //线程1个,队列2个
            pool.execute(runs[3]);    //线程1个,队列3个
            pool.execute(runs[4]);    //线程2个,队列3个
            pool.execute(runs[5]);    //线程2个,队列3个,拒绝第六个
            
            pool.shutdown();
            
        }
    }

    线程:

    public class MyTask implements Runnable {
    
        private int id;
        
        public MyTask(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task-" + id);
        }
    
    }

    这个就是最简单的一个使用方法了。ps:最后那个(线程1个,队列0个....)指的是,你仅仅执行runs[0];runs[0]+runs[1];runs[0]+runs[1]+runs[2];....的时候,任务被放到哪里。

    但是推荐使用这种方式创建线程池:

    package cn.ying.thread.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class MyTest {
        
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(5);
            
            for (int i = 0; i < 10; i++) {
                pool.execute(new MyTask(i));
            }
            pool.shutdown();
        }
    
        public void note(){
            Executors.newCachedThreadPool();    //无界线程池,可以进行自动线程回收
    //        public static ExecutorService newCachedThreadPool() {
    //            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    //                                          60L, TimeUnit.SECONDS,
    //                                          new SynchronousQueue<Runnable>());//SynchronousQueue:长度为1的队列
    //        }
            Executors.newFixedThreadPool(10);    //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
    //        public static ExecutorService newFixedThreadPool(int nThreads) {
    //            return new ThreadPoolExecutor(nThreads, nThreads,
    //                                          0L, TimeUnit.MILLISECONDS,
    //                                          new LinkedBlockingQueue<Runnable>());
    //        }
            Executors.newScheduledThreadPool(10);    //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
    //        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    //            return new ScheduledThreadPoolExecutor(corePoolSize);
    //        }
    //        public ScheduledThreadPoolExecutor(int corePoolSize) {
    //            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    //                  new DelayedWorkQueue());
    //        }
            Executors.newSingleThreadExecutor();    //创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程
    //        public static ExecutorService newSingleThreadExecutor() {
    //            return new FinalizableDelegatedExecutorService
    //                (new ThreadPoolExecutor(1, 1,
    //                                        0L, TimeUnit.MILLISECONDS,
    //                                        new LinkedBlockingQueue<Runnable>()));
    //        }
            
        }
    }

    ==============================更新

    4. 拒绝策略

    1. AbortPolicy(丢弃任务并抛出RejectedExecutionException异常)默认策略

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3),
                    // 拒绝并抛出异常
                    new ThreadPoolExecutor.AbortPolicy());
    
            for (int i = 0; i < 6; i++){
                pool.execute(new Task("任务" + i));
            }
            pool.shutdown();
        }
    
        private static class Task implements Runnable{
            private String name;
    
            Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "===" + name);
            }
        }
    
    }

    运行:

    2. DiscardPolicy(丢弃任务,但是不抛出异常)

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3),
                    // 拒绝但不抛出异常
                    new ThreadPoolExecutor.DiscardPolicy());
    
            for (int i = 0; i < 6; i++){
                pool.execute(new Task("任务" + i));
            }
            pool.shutdown();
        }
    
        private static class Task implements Runnable{
            private String name;
    
            Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "===" + name);
            }
        }
    
    }

    运行:

     3. DiscardOldestPolicy(丢弃队列最前面的任务,然后重新提交被拒绝的任务)

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3),
                    // 丢弃队列中最前面的任务,然后执行被拒绝的任务
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            for (int i = 0; i < 6; i++){
                pool.execute(new Task("任务" + i));
            }
            pool.shutdown();
        }
    
        private static class Task implements Runnable{
            private String name;
    
            Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "===" + name);
            }
        }
    
    }

    运行:

    4. CallerRunsPolicy(由提交任务的线程处理)

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3),
                    // 由提交任务的线程直接执行此任务
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
            for (int i = 0; i < 6; i++){
                pool.execute(new Task("任务" + i));
            }
            pool.shutdown();
        }
    
        private static class Task implements Runnable{
            private String name;
    
            Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "===" + name);
            }
        }
    
    }

    运行:

     5. 自己处理

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
    
        public static void main(String[] args) throws Exception {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,
                    2,
                    0L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(3),
                    // 自定义处理
                    (Runnable r, ThreadPoolExecutor executor) -> {
                        System.out.println("自定义处理");
                    });
    
            for (int i = 0; i < 6; i++){
                pool.execute(new Task("任务" + i));
            }
            pool.shutdown();
        }
    
        private static class Task implements Runnable{
            private String name;
    
            Task(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "===" + name);
            }
        }
    
    }

    运行:

  • 相关阅读:
    数学之道-微积分
    mysql join实现方式
    python pip源配置
    python使用tesseract-ocr完成验证码识别
    Linux和Windows下查看环境变量方法对比
    把大象装进冰箱的N种方法
    mysql 取当前日期对应的周一或周日
    window 安装 Twisted 遇到的问题
    Java泛型
    Android之Adapter用法总结
  • 原文地址:https://www.cnblogs.com/LUA123/p/7248530.html
Copyright © 2011-2022 走看看