zoukankan      html  css  js  c++  java
  • java并发线程池---了解ThreadPoolExecutor就够了

    总结:线程池的特点是,在线程的数量=corePoolSize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumPoolSize执行拒绝策略。

    线程池-intsmaze

    线程池的思想是:在系统中开辟一块区域,其中存放一些待命的线程,这个区域被称为线程池。如果有需要执行的任务,则从线程池中借一个待命的线程来执行指定的任务,到任务结束可以再将所借线程归还。这样就避免了大量重复创建线程对象,浪费CPU,内存资源。

    自定义线程池-intsmaze

    如果观察jdk提供的各种线程池的源码实现可以发现,除了jdk8新增的线程池newWorkStealingPool以外,都是基于对ThreadPoolExecutor的封装实现,所以首先讲解ThreadPoolExecutor的具体功能。

    ThreadPoolExecutor详解-intsmaze

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    corePoolSize:指定线程池中线程数量

    maximumPoolSize:最大线程数量

    keepAliveTime:线程数量超过corePoolSize时,多于的空闲线程的存活时间(超过这段时间,该空闲线程会被销毁)。

    unit:keepAliveTime的时间单位

    workQueue:任务队列,提交但是未被执行的任务

    threadFactory:创建线程的线程工厂,默认即可

    handler:拒绝策略,当任务太多来不及处理,如何拒绝任务,默认为new AbortPolicy()策略。
            ExecutorService es = new ThreadPoolExecutor(3, 8, 60L,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                    Executors.defaultThreadFactory(),
                    new RejectedExecutionHandler() {
                        public void rejectedExecution(Runnable r,
                                ThreadPoolExecutor executor) {
                            System.out.println("discard");
                        }
                    });

    任务队列--存放runnable对象-intsmaze

    总结:线程池的特点是,在线程的数量=corePoolSize后,仅任务队列满了之后,才会从任务队列中取出一个任务,然后构造一个新的线程,循环往复直到线程数量达到maximumPoolSize执行拒绝策略。
     
    只要队列实现BlockingQueue接口即可,注意ConcurrentLinkedQueue实现的最顶层的queue接口所以不能用在这里
    常用的有如下:
    SynchronousQueue:直接提交队列,该队列没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。所以他不保存任务,总是将任务提交给线程执行,如果没有空闲的线程,则创建新的线程,当线程数量达到最大,则执行拒绝策略。
     
    ArrayBlockingQueue:有界任务队列,线程池的线程数小于corePoolSize,则创建新的线程,大于corePoolSize,则将新的任务加入等待队列。若等待队列已满,则在总线程不大于maximumPoolSize下,创建新的线程执行任务,大于maximumPoolSize则执行拒绝策略。
     
    LinkedBlockingQueue:无界队列,除非系统资源耗尽,否则不存在任务入队失败的情况。线程池的线程数小于corePoolSize,则创建新的线程,大于corePoolSize,则将新的任务加入等待队列。
     
    PriorityBlockingQueue优先任务队列,可以控制任务的执行先后顺序,是无界队列。ArrayBlockingQueue,LinkedBlockingQueue都是按照先进先出算法处理任务的,PriorityBlockingQueue可以根据任务自身的优先顺序先后执行。

    拒绝策略-intsmaze

    线程池中的线程用完了,同时等待队列中的任务已经塞满了,再也塞不下新任务了,就需要拒绝策略:处理任务数量超过系统实际承受能力时,处理方式。
    jdk内置四种拒绝策略:
    AbortPolicy:直接抛出异常(默认策略),就算线程池有空闲了,后面的线程也无法在运行了,要想后面的线程可以运行,要捕获异常信息。

    CallerRunsPolicy:该策略直接在调用者线程中运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是任务提交线程的性能极有可能会急剧下降。

    DiscardOldestPolicy:将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

    DiscardPolicy:默默丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这可能是最好的一种解决方案。在线程池不空闲的时候,提交的任务都将丢弃,当有空闲的线程时提交的任务会执行。

    下面是jdk的拒绝策略源码-intsmaze

       public static class CallerRunsPolicy implements RejectedExecutionHandler {
    
            public CallerRunsPolicy() { }
    
            /**
             * 直接在调用者线程中运行当前被丢弃的任务,要注意这里是调用Runnable的run()方法,而不是start()方法启动线程,run()以普通方法的形式在主线程中执行任务,会阻塞
    * 后面es.submit(new MyTask(i))方法的执行
    */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } /** * 将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

    总结:AbortPolicy策略下,我们要catch异常,这样我们可以捕获到哪些任务被丢弃了。如果采用其他的策略,丢弃的任务无法定位的,只能通过下列程序中es.submit(new MyTask(i));任务之前打印该任务,运行任务的run()逻辑是,在打印任务信息,两处日志比对来定位哪些任务被丢弃了。

    public class MyTask implements Runnable
    {
    
        private int number;
        
        public MyTask(int number) {
            super();
            this.number = number;
        }
    
        public void run() {
            System.out.println(System.currentTimeMillis()+"thread id:"+Thread.currentThread().getId()+"==="+number);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
        public static void main(String[] args)  {
    //        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS, 
    //                new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
            
    //        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
    //                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
            
    //        ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
    //                new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
            
            ExecutorService es=new ThreadPoolExecutor(5,5,60L, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
            for(int i=0;i<10000;i++)
            {
                try {
                    System.out.println(i);
                    es.submit(new MyTask(i));
                    Thread.sleep(100);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("------------------------"+i);
                }
            }
        }

    线程池执行逻辑源码解析-intsmaze

          public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        
           /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *如果少于corePoolSize线程正在运行,首先尝试用给定的命令启动一个新的线程任务。 
             自动调用addWorker检查runState和workerCount,
             
             
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *如果任务可以成功排队,那么我们仍然需要
               仔细检查我们是否应该添加一个线程
              (因为现有的自从上次检查后死亡)或者那个
              自进入该方法以来,该池关闭。 所以我们
              重新检查状态,如果有必要的话回滚队列
              停止,或者如果没有的话就开始一个新的线程。
             
             
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);//队列满了,执行拒绝策略
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
        
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);//这里就是调用我们传入的拒绝策略对象的方法
        }
        
         /**
         * Dispatch an uncaught exception to the handler. This method is
         * intended to be called only by the JVM.
         */
        private void dispatchUncaughtException(Throwable e) {
            getUncaughtExceptionHandler().uncaughtException(this, e);
        }

    常见线程池的构造方法-intsmaze

    newFixedThreadPoo-intsmaze

    任务队列为LinkedBlockingQueue中(长度无限),线程数量和最大线程数量相同。功能参考前面的任务队列总结。

    ExecutorService es=Executors.newFixedThreadPool(5);//参数同时指定线程池中线程数量为5,最大线程数量为5public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    newSingleThreadExecutor-intsmaze

    任务队列LinkedBlockingQueue中(长度无限),线程数量和最大线程数量均为1。
    ExecutorService es=Executors.newSingleThreadExecutor();//线程池中线程数量和最大线程数量均为1.public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    newCachedThreadPool-intsmaze

    任务队列为SynchronousQueue,线程数量为0,最大线程数量为Integer.MAX_VALUE,所以只要有任务没有空闲线程就会创建就新线程。
    ExecutorService es=Executors.newCachedThreadPool();
    //指定线程池中线程数量为0,最大线程数量为Integer.MAX_VALUE,任务队列为SynchronousQueuepublic static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    newScheduledThreadPool- -定时线程-intsmaze

    任务队列为new DelayedWorkQueue(),返回的对象在ExecutorService接口上扩展了在指定时间执行某认为的功能,在某个固定的延时之后执行或周期性执行某个任务。 

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
    }
    
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
    }

    newSingleThreadScheduledExecutor- -定时线程-intsmaze

    相当于newScheduledThreadPool(int corePoolSize)corePoolSize设置为1。

    ScheduledExecutorService es=Executors.newSingleThreadScheduledExecutor();

    延迟线程池

    class MyScheduledTask implements Runnable
    {
     private String tname;
     public MyScheduledTask(String tname)
     {
      this.tname=tname;
     }
     public void run()
     {
      System.out.println(tname+"任务时延2秒执行!!!");
     }
    }
    public class intsmaze
    {
     public static void main(String[] args)
     {
      ScheduledExecutorService scheduledThreadPool
                           =Executors.newScheduledThreadPool(2);
      MyScheduledTask mt1=new MyScheduledTask("MT1");
      scheduledThreadPool.schedule(mt1,2,TimeUnit.SECONDS);
    
     }
    }

    newWorkStealingPool java8新增连接池-intsmaze

        public static ExecutorService newWorkStealingPool(int parallelism) {
            return new ForkJoinPool
                (parallelism,
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }//创建指定数量的线程池来执行给定的并行级别,还会使用多个队列减少竞争
        public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }//前一个方法的简化,如果当前机器有4个CPU,则目标的并行级别被设置为4。

    关闭线程池(很少使用,除了切换数据源时需要控制)-intsmaze

    希望程序执行完所有任务后退出,调用ExecutorService接口中的shutdown(),shutdownNow()方法。
    用完一个线程池后,应该调用该线程池的shutdown方法,将启动线程池的关闭序列。调用shutdown方法后,线程池不在接收新的任务,但是会将以前所有已经提交的任务执行完。当线程池中的所有任务都执行完后,线程池中的所有线程都会死亡;shutdownNow方法会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

    线程池优化-intsmaze

    一般来说确定线程池的大小需要考虑CPU数量,内存大小,JDBC连接等因素。在《java并发编程实践》一书中给出了一个估算线程池大小的经验公式:
    Ncpu=CPU的数量
    Ucpu=目标CPU的使用率,0<=Ucpu<=1
    W/C=等待时间与计算时间的比率
    为保持处理器达到期望的使用率,最优的线程池的大小等于:
    Nthreads=Ncpu*Ucpu*(1+W/C)
    在java中,可以通过
    Runtime.getRuntime().availableProcessors()

    取得可以CPU数量。

  • 相关阅读:
    Java8 lambda表达式语法 1
    Spring WebMVC 4.1返回json时 406(Not Acceptable)
    上传 第三方jar包 nexus
    Nexus 使用配置
    Nexus 安装 使用说明
    mysql 常用命令
    JedisPoolConfig配置
    tomcat 管理端 安全措施
    Java ReentrantLock和synchronized两种锁定机制的对比
    spring 在web容器启动时执行初始化方法
  • 原文地址:https://www.cnblogs.com/intsmaze/p/9432199.html
Copyright © 2011-2022 走看看