zoukankan      html  css  js  c++  java
  • 为什么要用线程池?

    为什么要用线程池1?

    服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。

    构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务。实际上,对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显。

    每个请求对应一个线程(thread-per-request)方法的不足之一是:为每个请求创建一个新线程的开销很大;为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。除了创建和销毁线程的开销之外,活动的线程也消耗系统资源(线程的生命周期!)。在一个JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或“切换过度”。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目。

    线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

    为什么要使用线程池2?


    操作系统创建线程、切换线程状态、终结线程都要进行CPU调度——这是一个耗费时间和系统资源的事情。

    大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的。这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程、切换线程状态、销毁线程这些操作所占用,用于业务请求处理的资源反而减少了。所以最理想的处理方式是,将处理请求的线程数量控制在一个范围,既保证后续的请求不会等待太长时间,又保证物理机将足够的资源用于请求处理本身。

    另外,一些操作系统是有最大线程数量限制的。当运行的线程数量逼近这个值的时候,操作系统会变得不稳定。这也是我们要限制线程数量的原因。

    ----------------

    线程池的替代方案

    线程池远不是服务器应用程序内使用多线程的唯一方法。如同上面所提到的,有时,为每个新任务生成一个新线程是十分明智的。然而,如果任务创建过于频繁而任务的平均处理时间过短,那么为每个任务生成一个新线程将会导致性能问题

    另一个常见的线程模型是为某一类型的任务分配一个后台线程与任务队列。

    每个任务对应一个线程方法和单个后台线程(single-background-thread)方法在某些情形下都工作得非常理想。每个任务一个线程方法在只有少量运行时间很长的任务时工作得十分好。而只要调度可预见性不是很重要,则单个后台线程方法就工作得十分好,如低优先级后台任务就是这种情况。然而,大多数服务器应用程序都是面向处理大量的短期任务或子任务,因此往往希望具有一种能够以低开销有效地处理这些任务的机制以及一些资源管理和定时可预见性的措施。线程池提供了这些优点。


    ----------------

    工作队列

    就线程池的实际实现方式而言,术语“线程池”有些使人误解,因为线程池“明显的”实现在大多数情形下并不一定产生我们希望的结果。术语“线程池”先于 Java 平台出现,因此它可能是较少面向对象方法的产物。然而,该术语仍继续广泛应用着。

    虽然我们可以轻易地实现一个线程池类,其中客户机类等待一个可用线程、将任务传递给该线程以便执行、然后在任务完成时将线程归还给池,但这种方法却存在几个潜在的负面影响。例如在池为空时,会发生什么呢?试图向池线程传递任务的调用者都会发现池为空,在调用者等待一个可用的池线程时,它的线程将阻塞。我们之所以要使用后台线程的原因之一常常是为了防止正在提交的线程被阻塞。完全堵住调用者,如在线程池的“明显的”实现的情况,可以杜绝我们试图解决的问题的发生。

    我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。清单 1 显示了简单的合用工作队列的示例。尽管 Thread API 没有对使用 Runnable 接口强加特殊要求,但使用 Runnable 对象队列的这种模式是调度程序和工作队列的公共约定。

    ----------------

    使用线程池的风险

    虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足,并发错误,线程泄漏,请求过载。

    ----------------

    有效使用线程池的准则

    只要您遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:

    (1)不要对那些同步等待其它任务结果的任务排队。这可能会导致上面所描述的那种形式的死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都很忙。

    (2)在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得 某些 进展。

    理解任务。要有效地调整线程池大小,您需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?您的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队 列可能会有意义,这样可以相应地调整每个池。

    ----------------

    调整池的大小

    调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。幸运的是,对于大多数应用程序来说,太多和太少之间的余地相当宽。

    线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有 N 或 N+1 个线程时一般会获得最大的 CPU 利用率。

    处理器利用率不是调整线程池大小过程中的唯一考虑事项。随着线程池的增长,您可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。

    ----------------

    无须编写您自己的池

    Doug Lea 编写了一个优秀的并发实用程序开放源码库 util.concurrent ,它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类集合类以及几个工作队列实现。该包中的 PooledExecutor 类是一种有效的、广泛使用的以工作队列为基础的线程池的正确实现。

    参考:http://blog.csdn.net/ichsonx/article/details/6265071 java 线程池 较详细文摘
    参考:http://www.ibm.com/developerworks/cn/java/l-threadPool/ 线程池的介绍及简单实现

    Java语言为我们提供了两种基础线程池的选择:ScheduledThreadPoolExecutor 和 ThreadPoolExecutor。它们都实现了ExecutorService接口(注意,ExecutorService接口本身和“线程池”并没有直接关系,它的定义更接近“执行器”,而“使用线程管理的方式进行实现”只是其中的一种实现方式)。这篇文章中,我们主要围绕ThreadPoolExecutor类进行讲解。

    ----------------

    线程池的创建

    我们可以通过java.util.concurrent.ThreadPoolExecutor来创建一个线程池。

    常用构造方法为:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)

    1. corePoolSize: 线程池维护线程的最少数量
    2. maximumPoolSize:线程池维护线程的最大数量
    3. keepAliveTime: 线程池维护线程所允许的空闲时间
    4. unit: 线程池维护线程所允许的空闲时间的单位
    5. workQueue: 线程池所使用的缓冲队列
    6. handler: 线程池对拒绝任务的处理策略

    处理过程
    当一个任务通过execute(Runnable)方法欲添加到线程池时:

    1. 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    2. 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
    3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
    4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。

    处理任务的优先级为
    核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

    当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

    handler有四个选择

    1. ThreadPoolExecutor.AbortPolicy() :抛出java.util.concurrent.RejectedExecutionException异常
    2. ThreadPoolExecutor.CallerRunsPolicy() : 重试添加当前的任务,他会自动重复调用execute()方法
    3. ThreadPoolExecutor.DiscardOldestPolicy() : 抛弃旧的任务
    4. ThreadPoolExecutor.DiscardPolicy() : 抛弃当前的任务

     举例:

    import java.io.Serializable;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class TestThreadPool2{
    
        public static void main(String[] args){
    
            // 构造一个线程池(池中最少有2个线程,最多4个线程,池中的线程允许空闲3秒,线程池所使用的缓冲队列ArrayBlockingQueue且容量为3,线程池对拒绝任务的处理策略为:抛弃旧的任务)
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            for (int i = 1; i <= 10; i++){
                try{
                    // 产生一个任务,并将其加入到线程池
                    String task = "task@ " + i;
                    System.out.println("put " + task);
                    threadPool.execute(new ThreadPoolTask(task));
    
                    // 便于观察,等待一段时间
                    Thread.sleep(2);
    
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 线程池执行的任务
     */
    class ThreadPoolTask implements Runnable, Serializable{
    
        private static final long serialVersionUID = 0;
    
        // 保存任务所需要的数据
        private Object threadPoolTaskData;
    
        ThreadPoolTask(Object tasks){
            this.threadPoolTaskData = tasks;
        }
    
        public void run(){
    
            // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
            System.out.println(Thread.currentThread().getName());
            System.out.println("start .." + threadPoolTaskData);
    
            try{
                // //便于观察,等待一段时间
                Thread.sleep(1000);
    
            }catch (Exception e){
                e.printStackTrace();
            }
            threadPoolTaskData = null;
        }
    
        public Object getTask(){
            return this.threadPoolTaskData;
        }
    }
    View Code

    说明:

    1. 在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
    2. 一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
    3. 在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。

    这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。

      4、通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制,改变这两个值就可以观察不同速率下程序的工作情况。
      5、通过调整4中所指的数据,再加上调整任务丢弃策略,换上其他三种策略,就可以看出不同策略下的不同处理方式。
      6、对于其他的使用方法,参看jdk的帮助,很容易理解和使用。

    再举一个例子:

    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolExecutorTest {
    
        private static int queueDeep = 4;
    
        public void createThreadPool(){
    
            /* 
             * 创建线程池,最小线程数为2,最大线程数为4,线程池维护线程的空闲时间为3秒, 
             * 使用队列深度为4的有界队列,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除, 
             * 然后重试执行程序(如果再次失败,则重复此过程),里面已经根据队列深度对任务加载进行了控制。 
             */ 
            ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            // 向线程池中添加 10 个任务
            for (int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                while (getQueueSize(tpe.getQueue()) >= queueDeep){
    
                    System.out.println("队列已满,等3秒再添加任务");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
                TaskThreadPool ttp = new TaskThreadPool(i);
                System.out.println("put i:" + i);
                tpe.execute(ttp);
            }
    
            tpe.shutdown();
        }
    
        private synchronized int getQueueSize(Queue queue) {
            return queue.size();
        }
    
        public static void main(String[] args) {
            ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
            test.createThreadPool();
        }
    
        class TaskThreadPool implements Runnable {
    
            private int index;
    
            public TaskThreadPool(int index) {
                this.index = index;
            }
    
            public void run() {
                System.out.println(Thread.currentThread() + " index:" + index);
                try{
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    View Code

    spring线程池ThreadPoolExecutor配置并且得到任务执行的结果

    Java并发包:ExecutorService和ThreadPoolExecutor

    Java并发包:Java Fork and Join using ForkJoinPool

    Java并发包:Exchanger和Semaphore

    Java并发包:Lock和ReadWriteLock

    Java并发包:ScheduledExecutorService(一种安排任务执行的ExecutorService)

    ----------------

    合理的配置线程池

    要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

    1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
    2. 任务的优先级:高,中和低。
    3. 任务的执行时间:长,中和短。
    4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    任务性质不同的任务可以用不同规模的线程池分开处理。

    1. CPU密集型任务配置尽可能小的线程:如配置(N个cpu+1)个线程的线程池。
    2. IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如(2*Ncpu)。
    3. 混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。

      我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。

    它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

    执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

    依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

    建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

    有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。

    如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

    当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

    Java线程池

    Java编程中“为了性能”尽量要做到的一些地方

  • 相关阅读:
    Python操作Excel
    JMeter生成UUID方式
    JMeter之Beanshell用法
    JMeter后置处理器
    JMeter后置处理器
    Python之正则匹配 re库
    read(),readline() 和 readlines() 比较
    Python的位置参数、默认参数、关键字参数、可变参数之间的区别
    调查管理系统 -(6)自定义Struts2的拦截器&自定义UserAware接口&Action中模型赋值问题&Hibernate懒加载问题
    调查管理系统 -(5)Struts2配置&用户注册/登录/校验
  • 原文地址:https://www.cnblogs.com/xinxindiandeng/p/6383311.html
Copyright © 2011-2022 走看看