zoukankan      html  css  js  c++  java
  • java 线程池 理解

    1.  前言

    无限制创建线程的不足:

    1) 线程生命周期开销高;

    2) 资源消耗大,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程占用许多内存,给垃圾回收器带来压力(频繁 stop the world)。所以,如果已经拥有足够多的线程使所有CPU保持忙碌状态,那么创建再多的线程反而会降低性能。

    3) 稳定性。可创建线程的数量存在一定限制。每个都会维护两个执行栈,一个用于java代码,另一个用于原生代码。通常JVM在默认情况下生成一个复合栈,大约为0.5MB。如果无限制地创建线程,破坏了系统对线程的限制,就很可能抛出OutOfMemoryError异常,使得系统处于不稳定状态。

    2.   Executor框架

    2.1.      简单使用

    public class ExecutorTest {
    
     
    
           private static final int NUMBERS = 100;
    
           private static final Executor EXECUTOR =
    
                         Executors.newFixedThreadPool(NUMBERS);
    
          
    
           public void processRequst() throws IOException {
    
                 
    
                  ServerSocket serverSocket = new ServerSocket(80);
    
                  while (true) {
    
                         final Socket conn = serverSocket.accept();
    
                         Runnable task = new Runnable() {
    
                               
    
                                @Override
    
                                public void run() {
    
                                       System.out.println("-----------process request----------");
    
                                }
    
                         };
    
                         EXECUTOR.execute(task);
    
                  }
    
           }
    
    }
    View Code

    2.2.      线程池

    线程池通过调用Executors的静态工厂方法的四种创建方式:

    1) newFixedThreadPool。创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,如果某个线程发生异常而结束那么线程池会补充一个线程。

    /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * {@code nThreads} threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    }
    View Code

    2) newCachedThreadPool。创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,将回收空闲的线程,而当需求增加时,则会添加新的线程,线程池的规模不存在任何限制。

     /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available.  These pools will typically improve the performance
         * of programs that execute many short-lived asynchronous tasks.
         * Calls to {@code execute} will reuse previously constructed
         * threads if available. If no existing thread is available, a new
         * thread will be created and added to the pool. Threads that have
         * not been used for sixty seconds are terminated and removed from
         * the cache. Thus, a pool that remains idle for long enough will
         * not consume any resources. Note that pools with similar
         * properties but different details (for example, timeout parameters)
         * may be created using {@link ThreadPoolExecutor} constructors.
         *
         * @return the newly created thread pool
         */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    View Code

    3) newSingleThreadExecutor。这是一个单线程的executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程替代。该方式能确保依照任务在队列中的顺序来串行执行(FIFO,LIFO,优先级)。

    /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue. (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * {@code newFixedThreadPool(1)} the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         *
         * @return the newly created single-threaded Executor
         */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    View Code

    4) newScheduledThreadPool。创建一个固定长度的线程池,并且以延迟或定时的方式执行任务

     /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
     /**
         * Creates a new {@code ScheduledThreadPoolExecutor} with the
         * given core pool size.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    View Code

    创建线程池的方法最终都是创建了一个ThreadPoolExecutors实例,该类的构造方法如下

     /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    View Code

    参数说明(摘抄自JDK1.6参考文档)

    1) 核心和最大池大小(corePoolSize和maximumPoolSize)

    ThreadPoolExecutor 将根据 corePoolSiz和 maximumPoolSize设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)setMaximumPoolSize(int) 进行动态更改。

    2) 保持活动时间(keepAliveTime)

    如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

    TimeUnit为超时时间单位。

    3)阻塞队列(BlockingQueue)

    所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

    • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
    • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
    • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    排队有三种通用策略:

    1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足。那么一个元素就不会再SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方向。
    2. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时,新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
    3. 有界队列。当使用有限的maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

    4) 创建新线程(ThreadFactory)

    使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

    5) 被拒绝的任务(RejectedExecutionHandler)

    当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

    1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
    2. ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    3. ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
    4. ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

    定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

    2.3.     Executor生命周期

    我们知道,JVM只有在所有线程全部终止后才会退出。所有,如果我们无法正确地关闭Executor,JVM将无法结束。为了解决执行服务的生命周期问题,ExecutorService继承Executor接口,添加了一些用于生命周期管理的方法。ExecutorService的生命周期有三种状态:运行、关闭和已终止。

    public interface ExecutorService extends Executor {
    
        /**
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be accepted.
         * Invocation has no additional effect if already shut down.
         * 执行平缓的关闭过程,不再接受新的任务,同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务
         */
        void shutdown();
    
        /**
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution.
         * 尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务
         */
        List<Runnable> shutdownNow();
    
        /**
         * Returns {@code true} if this executor has been shut down.
         * 查询 ExecutorService 是否已经关闭
         */
        boolean isShutdown();
    
        /**
         * Returns {@code true} if all tasks have completed following shut down.
         * Note that {@code isTerminated} is never {@code true} unless
         * either {@code shutdown} or {@code shutdownNow} was called first.
         *
         * @return {@code true} if all tasks have completed following shut down
         * 
         * 查询 ExecutorService 是否已经终止
         */
        boolean isTerminated();
    
        /**
         * Blocks until all tasks have completed execution after a shutdown
         * request, or the timeout occurs, or the current thread is
         * interrupted, whichever happens first.
         *
         * 等待ExecutorService进入终止状态
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /*其他用于提交任务的方法……*/
    }
    View Code

    3.   可利用的并行性

    3.1.     异构任务中的并行

    采用future和callable携带任务结果,长时间执行的任务可以先进行计算,在之后通过future取得计算结果。

    示例程序

    /*
     * 网页数据加载并行的可能性,假设网页只包含文本和图像两种数据
     */
    public class CompletionTest {
        
        class ImageData{
            // 属性....
        }
        
        // 从序列化的数据中加载图像
        public ImageData loadFrom(String source) {
            return new ImageData();
        }
    
        /**
         * 单线程模式
         * @param sequence 序列化后的网页数据
         */
        public void loadWithSingleThread(CharSequence source) {
            System.out.print("加载文本");
            List<ImageData> list = new ArrayList<>();
            
            // 从 source 解析图像数据并加入到 list 中  .....
            // loadImage(source)
            
            for (ImageData imageData : list) {
                System.out.println("图像加载完成" + imageData);
            }
        }
        
        private final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        
        /*
         * 单线程加载CPU利用率低,如果程序依赖于长时间的 io(当前从网络加载图像就是)那么将很费时间
         * 结合 futureTask 预加载图像 
         */
        public void loadInFuture(CharSequence source) {
            
            Callable<List<ImageData>> task = new Callable<List<ImageData>>() {
    
                @Override
                public List<ImageData> call() throws Exception {
                    List<ImageData> result = new ArrayList<>();
                    
                    /*
                     * 加载图像数据到 result
                     * loadImageFrom source to list.....
                     */
                    
                    return result;
                }
            };
            
            Future<List<ImageData>> future = executorService.submit(task);
            
            /*
             * loadText from source.....
             */
            System.out.println("loading text");
            
            try {
                List<ImageData> imageDatas = future.get();
                for (ImageData imageData : imageDatas) {
                    System.out.println("图像数据" + imageData);
                }
            } catch (InterruptedException e) {
                // 抛出中断异常,重新设置线程的中断状态
                Thread.currentThread().interrupt();
                // 中断了,结果已不需要
                future.cancel(true);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        /*    
         * 采用 future 来预加载图像在一定程度上提供了并发性,但在本问题中效率仍比较低,因为我们采用的是一次性加载完图像
         * 再返回,而相对于加载文本来说,图像加载速度要低很多,在本问题中几乎可以说效率与串行差别不大,那怎么改进?
         * 为每一图片设置一个相应的 future计算任务,然后循环操作,每计算完就直接加载,那样用户看到的页面是一张张加载
         * 出来的,这可行,但比较繁琐,我们可以直接使用CompletionService。
         * 结合 completionService 加载图像
         */
        public void loadWithCompeletionSevice(CharSequence source) {
    
            List<String> imageInfo = new ArrayList<>();
            // imageInfo = load from source.......
            
            CompletionService<ImageData> service = 
                    new ExecutorCompletionService<>(executorService);
            for (String string : imageInfo) {    
                service.submit(new Callable<CompletionTest.ImageData>() {
    
                    @Override
                    public ImageData call() throws Exception {
                        ImageData imageData = loadFrom(string);
                        // imageDate = loadFromSource();
                        return imageData;
                    }
                });
            }
            
            // loadText(source)
            System.out.println("loading text");
            
            try {
                for (int i = 0; i < imageInfo.size(); i++) {
                    // 在得出结果之前阻塞
                    Future<ImageData> future = service.take();
                    ImageData imageData = future.get();
                    
                    // loading image ...
                    System.out.println("loading image" + imageData);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }        
        }    
    }
    View Code

    3.2.     为任务设定时限

    有时候,我们可能无法在指定的时间内完成某个任务,那么我们将不需要它的结果,此时我们可以放弃这个任务。例如,一个web应用程序需要从外部的广告服务器获取广告,当如果该应用程序在2秒内得不到响应,那么将显示一个默认的广告,这样即使无法获取广告信息,也不会降低站点的性能。

    程序示例

    public class GetAd {
    
        class Ad{
            /*
             * 属性....
             */
        }
        
        private final ExecutorService execute = Executors.newFixedThreadPool(100);
        private final Ad DEFAULT_AD = new Ad();
        
        private final long TIME_LOAD = 2000;
        public void loadAd() {
            
            long endTime = System.currentTimeMillis() + TIME_LOAD;
            Future<Ad> future = execute.submit(new FetchAdTask());
            
            // loading page text.....
            
            Ad ad;
            try {
                // leftTime 有可能为负数,但 future.get 方法会把负数视为0
                long leftTime = endTime - System.currentTimeMillis();
                ad = future.get(leftTime, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                
                Thread.currentThread().interrupt();
                future.cancel(true);
                ad = DEFAULT_AD;
                
            } catch (TimeoutException e) {
                
                future.cancel(true);
                ad = DEFAULT_AD;
                
            } catch (ExecutionException e) {
                ad = DEFAULT_AD;
            }
            
            System.out.println("load complete" + ad);
        }
        
        class FetchAdTask implements Callable<Ad>{
    
            @Override
            public Ad call() throws Exception {
                // load ad from ad server
                return new Ad();
            }
        
        }
    }
    View Code

    总结自《java并发编程实战》

  • 相关阅读:
    Atitit s2018.6 s6 doc list on com pc.docx Atitit s2018.6 s6 doc list on com pc.docx  Aitit algo fix 算法系列补充.docx Atiitt 兼容性提示的艺术 attilax总结.docx Atitit 应用程序容器化总结 v2 s66.docx Atitit file cms api
    Atitit s2018.5 s5 doc list on com pc.docx  v2
    Atitit s2018.5 s5 doc list on com pc.docx  Acc 112237553.docx Acc baidu netdisk.docx Acc csdn 18821766710 attilax main num.docx Atiitt put post 工具 开发工具dev tool test.docx Atiitt 腾讯图像分类相册管家.docx
    Atitit s2018 s4 doc list dvchomepc dvccompc.docx .docx s2018 s4 doc compc dtS44 s2018 s4 doc dvcCompc dtS420 s2018 s4f doc homepc s2018 s4 doc compc dtS44(5 封私信 _ 44 条消息)WebSocket 有没有可能取代 AJAX
    Atitit s2018 s3 doc list alldvc.docx .docx s2018 s3f doc compc s2018 s3f doc homepc sum doc dvcCompc dtS312 s2018 s3f doc compcAtitit PathUtil 工具新特性新版本 v8 s312.docx s2018 s3f doc compcAtitit 操作日
    Atitit s2018.2 s2 doc list on home ntpc.docx  Atiitt uke制度体系 法律 法规 规章 条例 国王诏书.docx Atiitt 手写文字识别 讯飞科大 语音云.docx Atitit 代码托管与虚拟主机.docx Atitit 企业文化 每日心灵 鸡汤 值班 发布.docx Atitit 几大研发体系对比 Stage-Gat
    Atitit 文员招募规范 attilax总结
    Atitit r2017 r6 doc list on home ntpc.docx
    atitit r9 doc on home ntpc .docx
    Atitit.如何文章写好 论文 文章 如何写好论文 技术博客 v4
  • 原文地址:https://www.cnblogs.com/X-huang/p/11254519.html
Copyright © 2011-2022 走看看