zoukankan      html  css  js  c++  java
  • Java并发编程总结5——ThreadPoolExecutor

    一、ThreadPoolExecutor介绍

    在jdk1.8中,构造函数有4个。以

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)为例:

    1、corePoolSize: 核心线程池大小

    2、maximumPoolSize: 最大线程池大小

    3、keepAliveTime: 当线程池中的线程数大于corePoolSize时, 多余空闲线程等待新任务的最长时间, 超过这个时间后多余线程终止

    4、unit: 时间单位, 比如毫秒, 纳秒等

    5、workQueue: 阻塞队列

    6、threadFactory: 创建线程工厂, 可以方便的创建线程, 可以自定义; 默认为Executors.DefaultThreadFactory

    7、handler: 饱和策略, 默认为AbortPolicy

     

    定义一个完整的线程池可以这么写:

        private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

     

    线程池的主要处理流程如下:

    1、提交任务到线程池,判断核心线程池(corePoolSize)是否已满? 没满,创建工作线程执行任务;满了,进入下个流程。

    2、线程池判断工作队列(workQueue)是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

    3、判断整个线程池(maximumPoolSize)是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略(RejectedExecutionHandler)来处理这个任务。

     

    二、饱和策略 RejectedExecutionHandler

    在ThreadPoolExecutor类中定义了4种RejectedExecutionHandler类型:

    1、AbortPolicy: 默认策略,直接抛出RejectedExecutionException异常。

    2、CallerRunsPolicy:只用调用者所在线程来运行任务。

    3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

    4、DiscardPolicy:不处理,丢弃掉。

    也可以自定义饱和策略,比如将无法处理的新任务加入日志等,只需要实现RejectedExecutionHandler接口即可。

     

    三、阻塞队列BlockingQueue

        /**
         * ArrayBlockingQueue: 由数组结构组成的有界阻塞队列, 队列遵循FIFO
         */
        private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10);
    
        /**
         * LinkedBlockingQueue: 由链表结构组成的阻塞队列, FIFO
         * LinkedBlockingDeque: 由链表结构组成的双向阻塞队列, 构造方法和LinkedBlockingQueue类似
         * public LinkedBlockingQueue(int capacity)     //有界阻塞队列,队列最大值为capacity
         * public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }        //无界阻塞队列,队列最大值为Integer.MAX_VALUE
         * 通过Executors.newFixedThreadPool(int nThreads)创建的线程池中采用的是无界LinkedBlockingQueue
         * 对于put和take操作, 内部采用了不同的锁: putLock, takeLock, 而ArrayBlockingQueue内部只有一把锁
         */
        private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10);
        private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
        private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>();
    
        /**
         * PriorityBlockingQueue: 支持优先级排序的有界阻塞队列
         * public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
         */
        private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() {
            public int compare(String o1, String o2) {
                return o1.compareTo(o2);        //升序排列
            }
        });
    
        /**
         * DelayQueue: 一个使用优先级队列实现的无界阻塞队列, 支持延时获取元素, 适用于缓存系统的设计以及定时任务调度。
         * 内部队列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>();
         * 队列元素需要实现Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E>
         */
    
        /**
         * SynchronousQueue: 不存储元素的阻塞队列
         */
    
        /**
         * LinkedTransferQueue: 由链表结构组成的实现了TransferQueue接口的无界阻塞队列
         * transfer方法: 如果当前消费者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生产者传入的元素可以直接传递给消费者, 而不放入队列
         */
        BlockingQueue<String> ltq = new LinkedTransferQueue<String>();

    四、通过Executors创建线程池

           /**
             * 创建单个线程, 适用于需要保证顺序执行任务的场景
             * return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
             * BlockingQueue采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义
             */
            Executors.newSingleThreadExecutor();
    /** * 创建固定线程, 适用于需要限制当前线程数量, 负载比较重的服务器 * return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); * 采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义 */ Executors.newFixedThreadPool(10);
    /** * 根据需要创建线程, 适用于执行很多的短期异步任务的小程序, 或者负载较轻的服务器 * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); * 初始线程为0, 采用SynchronousQueue阻塞队列, 如果生产任务的速度低于消费的速度, 空闲60s的线程会被终止; 如果生产任务的速度持续高于消费速度, 则会不断创建新线程 */ Executors.newCachedThreadPool();
    /** * 在给定的延迟之后运行任务, 或者定期执行任务 * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); * 采用无界DelayQueue队列, 因此maximumPoolSize、keepAliveTime参数无意义 */ Executors.newScheduledThreadPool(10);

     

     

  • 相关阅读:
    [Real World Haskell翻译]第24章 并发和多核编程 第一部分并发编程
    [Real World Haskell翻译]第22章 扩展示例:Web客户端编程
    [Real World Haskell翻译]第27章 网络通信和系统日志 Sockets and Syslog
    druid 解密
    git clone 所有远程分支到本地
    十、Delphi 把Json转成Delphi实体类
    XML External Entity attack/XXE攻击
    大家好!
    XXE攻防——XML外部实体注入
    ubuntu安装Go环境
  • 原文地址:https://www.cnblogs.com/everSeeker/p/5632450.html
Copyright © 2011-2022 走看看