zoukankan      html  css  js  c++  java
  • java线程池技术

    1.线程池的实现原理?
    简介: 多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。


    线程池技术正是关注如何缩短或调整T1、T3时间的技术,从而提高服务器程序性能的。它把T1、T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1、T3的开销了。
    线程池不仅调整T1、T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
        假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

    2.Spring中的线程池ThreadPoolTaskExecutor
    ThreadPoolTaskExecutor是一个spring的线程池技术,它是使用jdk中的java.util.concurrent.ThreadPoolExecutor进行实现。
    int corePoolSize:线程池维护线程的最小数量. 
      int maximumPoolSize:线程池维护线程的最大数量. 
      long keepAliveTime:空闲线程的存活时间. 
      TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值. 
      BlockingQueue<Runnable> workQueue:持有等待执行的任务队列. 
      RejectedExecutionHandler handler: 用来拒绝一个任务的执行,有两种情况会发生这种情况:
      一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和; 
      二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。
    一、初始化
    方式1.直接调用:
    1.ThreadPoolTaskExecutor  poolTaskExecutor = new  ThreadPoolTaskExecutor();  
    2.//线程池所使用的缓冲队列  
    3.poolTaskExecutor.setQueueCapacity(200);  
    4.//线程池维护线程的最少数量  
    5.poolTaskExecutor.setCorePoolSize(5);  
    6.//线程池维护线程的最大数量  
    7.poolTaskExecutor.setMaxPoolSize(1000);  
    8.//线程池维护线程所允许的空闲时间  
    9.poolTaskExecutor.setKeepAliveSeconds(30000);  
    10.poolTaskExecutor.initialize();  
    方式2.配置文件:

    程序里获取:
    ApplicationContext ctx =  new ClassPathXmlApplicationContext("applicationContext.xml");
    ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor)ctx.getBean("threadPool");
    二、利用线程池启动线程:
    Thread udpThread = new Thread(udp);//创建一个线程
    poolTaskExecutor.execute(udpThread);//把线程添加到线程池中
    //获取当前线程池活动的线程数:
    int count = poolTaskExecutor.getActiveCount();
    logger.debug("[x] - now threadpool active threads totalNum : " +count);
    三、配置解释即ThreadPoolTaskExecutor的处理流程:
    当一个任务通过execute(Runnable)方法欲添加到线程池时:
    1、当线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    2、当线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
    3、当线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
    4、当线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    5、当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
    其会优先创建  CorePoolSize 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSize 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错误 org.springframework.core.task.TaskRejectedException。
    另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。
    Reject策略预定义有四种: 
    (1)ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。 
    (2)ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. 
    (3)ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃. 
    (4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).

    3.ThreadPoolExecutor策略配置以及应用场景,在不同的业务场景下线程池参数的设计
    三种常用的ThreadPoolExecutor:Executor是提供了一组工厂方法用于创建常用的ExecutorService,分别是FixedThreadPool,SingleThreadExecutor以及CachedThreadPool。这三种ThreadPoolExecutor都是调用ThreadPoolExecutor构造函数进行创建,区别在于参数不同。
    FixedThreadPool - 线程池大小固定,任务队列无界
    1.从线程池中获取可用线程执行任务,如果没有可用线程则使用ThreadFactory创建新的线程,直到线程数达到nThreads;
    2.线程池线程数达到nThreads以后,新的任务将被放入队列
    优点:能够保证所有的任务都被执行,永远不会拒绝新的任务
    缺点:队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题
    SingleThreadExecutor---线程池大小固定为1,任务队列无界
    优点:适用于在逻辑上需要单线程处理任务的场景,同事无界的LinkedBlockingQueue保证新任务都能够放入队列,不会被拒绝
    缺点:和FixedThreadPool相同,当处理任务无限等待的时候会造成内存问题
    CachedThreadPool---线程池无限大(Integer.MAX_VALUE),等待队列长度为1
    核心线程数是0,意味着所有任务会先入队列;最大线程数是Integer.MAX_VALUE,可以认为线程数量是没有限制的。KeepAlive时间被设置成60秒,意味着在没有任务的时候线程等待60秒以后退出。CachedThreadPool对任务的处理策略是提交的任务会立即分配一个线程进行执行,线程池中线程数量会随着任务数的变化自动扩张和缩减,在任务执行时间无限延长的极端情况下会创建过多的线程。

    总结
    对于需要保证所有提交的任务都要被执行的情况,使用FixedThreadPool
    如果限定只能使用一个线程进行任务处理,使用SingleThreadExecutor
    如果希望提交的任务尽快分配线程执行,使用CachedThreadPool
    如果业务上允许任务执行失败,或者任务执行过程可能出现执行时间过长进而影响其他业务的应用场景,可以通过使用限定线程数量的线程池以及限定长度的队列进行容错处理。
    高并发,低耗时,考虑用newCachedThreadPool,低并发,高耗时,考虑用newFixedThreadPool或者newSingleThreadPool.它们实现的思想刚好跟你不一样。高并发高耗时可以考虑用异步执行。
    排队有三种通用策略:
    1.直接提交:工作队列默认选项是SynchronousQueue,它将任务直接提交给线程而不保持它们。在此如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界,即maxPoolSize为Integer.MAX_VALUE,以避免拒绝新提交的任务。
    2.无界队列:使用无界队列(如LinkedBlockingQueue)将导致在所有corePoolSize线程都忙时,新任务在队列中等待。这样,创建的线程就不会超过corePoolSize。(因此,maxPoolSize的值也就无效了)当每个任务完全独立于其他任务,即任务执行互不影响时,适用于使用无界队列。
    3.有界队列:当使用有线的maxPoolSize时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折中:使用大型队列和小型池可以最大限制地降低CPU使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  
    到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。
    例子一:使用直接提交策略,也即SynchronousQueue。
    首先SynchronousQueue是无界的,也就是说他存储任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。
    我们使用一下参数构造ThreadPoolExecutor:
    1.new ThreadPoolExecutor(  
    2.            2, 3, 30, TimeUnit.SECONDS,   
    3.            new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),   
    4.            new RecorderThreadFactory("CookieRecorderPool"),   
    5.            new ThreadPoolExecutor.CallerRunsPolicy());  

    当核心线程已经有2个正在运行.
    1.此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
    2.又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
    3.此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。
    4.暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。
    所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
    什么意思?如果你的任务A1、A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中。
    例子二:使用无界队列策略,即LinkedBlockingQueue
    这个就拿newFixedThreadPool来说,根据前文提到的规则:
    如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
    那么当任务继续增加,会发生什么呢?
    如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
    OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?
    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
    这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。
    例子三:有界队列,使用ArrayBlockingQueue。
    这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。
    1.new ThreadPoolExecutor(  
    2.            2, 4, 30, TimeUnit.SECONDS,   
    3.            new ArrayBlockingQueue<Runnable>(2),   
    4.            new RecorderThreadFactory("CookieRecorderPool"),   
    5.            new ThreadPoolExecutor.CallerRunsPolicy());  
    假设,所有的任务都永远无法执行完。
    对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。
    总结:
    1.ThreadPoolExecutor的使用还是很有技巧的。
    2.使用无界queue可能会耗尽系统资源。
    3.使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小
    4.线程数自然也有开销,所以需要根据不同应用进行调节。
    keepAliveTime:
    jdk中的解释是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
    什么意思?接着上面的解释,后来向老板派来的工人始终是“借来的”,俗话说“有借就有还”,但这里的问题就是什么时候还了,如果借来的工人刚完成一个任务就还回去,后来发现任务还有,那岂不是又要去借?这一来一往,老板肯定头也大死了。
    合理的策略:既然借了,那就多借一会儿。直到“某一段”时间后,发现再也用不到这些工人时,便可以还回去了。这里的某一段时间便是keepAliveTime的含义,TimeUnit为keepAliveTime值的度量。
    RejectedExecutionHandler
    另一种情况便是,即使向老板借了工人,但是任务还是继续过来,还是忙不过来,这时整个队伍只好拒绝接受了。
    RejectedExecutionHandler接口提供了对于拒绝任务的处理的自定方法的机会。在ThreadPoolExecutor中已经默认包含了4中策略,因为源码非常简单,这里直接贴出来。
    CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    1.public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    2.            if (!e.isShutdown()) {  
    3.                r.run();  
    4.            }  
    5.        }  
    这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该的execute线程本身来执行。
    AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
    1.public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    2.            throw new RejectedExecutionException();  
    3.        }  
     这种策略直接抛出异常,丢弃任务。
    DiscardPolicy:不能执行的任务将被删除
    1.public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    2.        }  
     这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
    DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
    1.public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
    2.            if (!e.isShutdown()) {  
    3.                e.getQueue().poll();  
    4.                e.execute(r);  
    5.            }  
    6.        }  
    该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
    设想:如果其他线程都还在运行,那么新来任务踢掉旧任务,缓存在queue中,再来一个任务又会踢掉queue中最老任务。
    总结:
    keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。
    反之,如果核心数较小,有界BlockingQueue数值又较小,同时keepAliveTime又设的很小,如果任务频繁,那么系统就会频繁的申请回收线程。

  • 相关阅读:
    POI Excel表格合并,边框设置
    MYSQL中group_concat有长度限制!默认1024(转载)
    MARQUEE 字符滚动条效果
    <A>标签电子邮件链接
    <A>标签锚标记
    <hr> 水平样式分隔线
    sudo gem install cocoapods 没反应问题
    适配iPhone6和iPhone6 Plus
    同步推是如何给未越狱的IOS设备安装任意IPA的?
    据说是百度ios面试题
  • 原文地址:https://www.cnblogs.com/huaiyinxiaojiang/p/7586682.html
Copyright © 2011-2022 走看看